You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2018/12/11 17:53:46 UTC

[2/2] lucene-solr:branch_7x: SOLR-13054: rewrite TriggerSetPropertiesIntegrationTest

SOLR-13054: rewrite TriggerSetPropertiesIntegrationTest

test no longer depends on changing static non-final non-volatile variables used by multiple threads

test also no longer depends on arbitrary sleep calls, instead threads await/poll on concurrent signaling objects/queues

(cherry picked from commit 3147c131e0ce997259f7bcf31e655d43dd99ef59)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c5426ecd
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c5426ecd
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c5426ecd

Branch: refs/heads/branch_7x
Commit: c5426ecd1807ab75f86b5932b64944fc7df85718
Parents: 2b62f8b
Author: Chris Hostetter <ho...@apache.org>
Authored: Tue Dec 11 10:38:36 2018 -0700
Committer: Chris Hostetter <ho...@apache.org>
Committed: Tue Dec 11 10:39:30 2018 -0700

----------------------------------------------------------------------
 .../TriggerSetPropertiesIntegrationTest.java    | 249 ++++++++++++-------
 1 file changed, 158 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c5426ecd/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
index 0ee0e1c..c59e60b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
@@ -20,13 +20,17 @@ package org.apache.solr.cloud.autoscaling;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
@@ -50,8 +54,6 @@ import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSourc
 public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
-
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(2)
@@ -67,134 +69,199 @@ public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase {
     assertEquals(response.get("result").toString(), "success");
   }
 
-  private static CountDownLatch getTriggerFiredLatch() {
-    return triggerFiredLatch;
-  }
-
+  /** 
+   * Test that we can add/remove triggers to a scheduler, and change the config on the fly, and still get
+   * expected behavior 
+   */
   public void testSetProperties() throws Exception {
-    JettySolrRunner runner = cluster.getJettySolrRunner(0);
-    SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
-    SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
-    AtomicLong diff = new AtomicLong(0);
-    triggerFiredLatch = new CountDownLatch(2); // have the trigger run twice to capture time difference
+    final JettySolrRunner runner = cluster.getJettySolrRunner(0);
+    final SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
+    final SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
+    
     try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) {
       AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap());
       scheduledTriggers.setAutoScalingConfig(config);
-      AutoScaling.Trigger t = new TriggerBase(TriggerEventType.NODELOST, "x") {
-        @Override
-        protected Map<String, Object> getState() {
-          return Collections.singletonMap("x", "y");
-        }
-
-        @Override
-        protected void setState(Map<String, Object> state) {
-
-        }
-
-        @Override
-        public void restoreState(AutoScaling.Trigger old) {
-
-        }
 
+      // Setup a trigger that records the timestamp of each time it was run
+      // we only need 2 timestamps for the test, so limit the queue and make the trigger a No-Op if full
+      final BlockingQueue<Long> timestamps = new ArrayBlockingQueue<Long>(2);
+      final AutoScaling.Trigger t1 = new MockTrigger(TriggerEventType.NODELOST, "mock-timestamper") {
         @Override
         public void run() {
-          if (getTriggerFiredLatch().getCount() == 0) return;
-          long l = diff.get();
-          diff.set(timeSource.getTimeNs() - l);
-          getTriggerFiredLatch().countDown();
+          log.info("Running {} in {}", this.getName(), Thread.currentThread().getName());
+          timestamps.offer(timeSource.getTimeNs());
         }
       };
-      t.configure(runner.getCoreContainer().getResourceLoader(), runner.getCoreContainer().getZkController().getSolrCloudManager(), Collections.emptyMap());
-      scheduledTriggers.add(t);
 
-      assertTrue(getTriggerFiredLatch().await(4, TimeUnit.SECONDS));
-      assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS) >= 0);
+      log.info("Configuring simple scheduler and adding trigger: {}", t1.getName());
+      t1.configure(resourceLoader, solrCloudManager, Collections.emptyMap());
+      scheduledTriggers.add(t1);
 
-      // change schedule delay
-      config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
+      waitForAndDiffTimestamps("conf(default delay)",
+                               ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS,
+                               timestamps);
+      
+      log.info("Reconfiguing scheduler to use 4s delay and clearing queue for trigger: {}", t1.getName());
+      config = config.withProperties(Collections.singletonMap
+                                     (AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
       scheduledTriggers.setAutoScalingConfig(config);
-      triggerFiredLatch = new CountDownLatch(2);
-      assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(10, TimeUnit.SECONDS));
-      assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(4) >= 0);
+      timestamps.clear();
 
-      // reset with default properties
-      scheduledTriggers.remove("x"); // remove the old trigger
+      waitForAndDiffTimestamps("conf(four sec delay)", 
+                               4, TimeUnit.SECONDS, 
+                               timestamps);
+      
+      log.info("Removing trigger: {}", t1.getName());
+      scheduledTriggers.remove(t1.getName());
+      
+      log.info("Reconfiguing scheduler to use default props");
       config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES);
       scheduledTriggers.setAutoScalingConfig(config);
 
-      // test core thread count
-      List<AutoScaling.Trigger> triggerList = new ArrayList<>();
-      final Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
-      final Set<String> triggerNames = Collections.synchronizedSet(new HashSet<>());
-      triggerFiredLatch = new CountDownLatch(8);
-      for (int i = 0; i < 8; i++) {
-        AutoScaling.Trigger trigger = new MockTrigger(TriggerEventType.NODELOST, "x" + i)  {
+                 
+      assertTrue("Test sanity check, need default thread pool to be at least 3 so we can" +
+                 "test lowering it by 2", ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE >= 3);
+      final int numTriggers = ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE;
+      final int reducedThreadPoolSize = numTriggers - 2;
+      
+      // Setup X instances of a trigger that:
+      //  - records it's name as being run
+      //    - skipping all remaining execution if it's name has already been recorded
+      //  - records the name of the thread that ran it
+      //  - blocks on a cyclic barrier untill at Y instances have run (to hog a thread)
+      // ...to test that the scheduler will add new threads as needed, up to the configured limit
+      //
+      // NOTE: the reason we need X unique instances is because the scheduler won't "re-run" a single
+      // trigger while a previouss "run" is still in process
+      final List<AutoScaling.Trigger> triggerList = new ArrayList<>(numTriggers);
+      
+      // Use a cyclic barrier gated by an atomic ref so we can swap it out later
+      final AtomicReference<CyclicBarrier> latch = new AtomicReference<>(new CyclicBarrier(numTriggers));
+      
+      // variables for tracking state as we go
+      // NOTE: all read/write must be gated by synchronizing on the barrier (ref),
+      //       so we we can ensure we are reading a consistent view
+      final Set<String> threadNames = Collections.synchronizedSet(new LinkedHashSet<>());
+      final Set<String> triggerNames = Collections.synchronizedSet(new LinkedHashSet<>());
+      final AtomicLong fails = new AtomicLong(0);
+
+      // Use a semaphore to track when each trigger *finishes* so our test thread
+      // can know when to check & clear the tracking state
+      final Semaphore completionSemaphore = new Semaphore(numTriggers);
+      
+      for (int i = 0; i < numTriggers; i++) {
+        AutoScaling.Trigger trigger = new MockTrigger(TriggerEventType.NODELOST,
+                                                      "mock-blocking-trigger-" + i)  {
           @Override
           public void run() {
-            try {
-              // If core pool size is increased then new threads won't be started if existing threads
-              // aren't busy with tasks. So we make this thread wait longer than necessary
-              // so that the pool is forced to start threads for other triggers
-              Thread.sleep(5000);
-            } catch (InterruptedException e) {
-            }
-            if (triggerNames.add(getName())) {
-              getTriggerFiredLatch().countDown();
+            log.info("Running {} in {}", this.getName(), Thread.currentThread().getName());
+            CyclicBarrier barrier = null;
+            synchronized (latch) {
+              if (triggerNames.add(this.getName())) {
+                log.info("{}: No-Op since we've already recorded a run", this.getName());
+                return;
+              }
               threadNames.add(Thread.currentThread().getName());
+              barrier = latch.get();
+            }
+            
+            try {
+              log.info("{}: waiting on barrier to hog a thread", this.getName());
+              barrier.await(30, TimeUnit.SECONDS);
+              completionSemaphore.release();
+            } catch (Exception e) {
+              fails.incrementAndGet();
+              log.error(this.getName() + ": failure waiting on cyclic barrier: " + e.toString(), e);
             }
           }
         };
+
         trigger.configure(resourceLoader, solrCloudManager, Collections.emptyMap());
         triggerList.add(trigger);
+        completionSemaphore.acquire();
+        log.info("Adding trigger {} to scheduler", trigger.getName());
         scheduledTriggers.add(trigger);
       }
-      assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
-      assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
-      assertEquals("Expected " + ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE
-              + " threads but found: " + threadNames,
-          ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE, threadNames.size());
-
-      // change core pool size
-      config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, 6));
-      scheduledTriggers.setAutoScalingConfig(config);
-      triggerFiredLatch = new CountDownLatch(8);
-      threadNames.clear();
-      triggerNames.clear();
-      assertTrue(getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
-      assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
-      assertEquals("Expected 6 threads but found: " + threadNames, 6, threadNames.size());
-
-      // reset
-      for (int i = 0; i < 8; i++) {
-        scheduledTriggers.remove(triggerList.get(i).getName());
+      
+      log.info("Waiting on semaphore for all triggers to signal completion...");
+      assertTrue("Timed out waiting for semaphore count to be released",
+                 completionSemaphore.tryAcquire(numTriggers, 60, TimeUnit.SECONDS));
+                                                
+      synchronized (latch) {
+        assertEquals("Unexpected number of trigger names found: " + triggerNames.toString(),
+                     numTriggers, triggerNames.size());
+        assertEquals("Unexpected number of thread ames found: " + threadNames.toString(),
+                     numTriggers, threadNames.size());
+        assertEquals("Unexpected number of trigger fails recorded, check logs?",
+                     0, fails.get());
+
+        // before releasing the latch, clear the state and update our config to use a lower number of threads
+        log.info("Updating scheduler config to use {} threads", reducedThreadPoolSize);
+        config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE,
+                                                                reducedThreadPoolSize));
+        scheduledTriggers.setAutoScalingConfig(config);
+
+        log.info("Updating cyclic barrier and clearing test state so triggers will 'run' again");
+        latch.set(new CyclicBarrier(reducedThreadPoolSize));
+        threadNames.clear();
+        triggerNames.clear();
+      }
+      
+      log.info("Waiting on semaphore for all triggers to signal completion...");
+      assertTrue("Timed out waiting for semaphore count to be released",
+                 completionSemaphore.tryAcquire(numTriggers, 60, TimeUnit.SECONDS));
+      
+      synchronized (latch) {
+        assertEquals("Unexpected number of trigger names found: " + triggerNames.toString(),
+                     numTriggers, triggerNames.size());
+        assertEquals("Unexpected number of thread names found: " + threadNames.toString(),
+                    reducedThreadPoolSize, threadNames.size());
+        assertEquals("Unexpected number of trigger fails recorded, check logs?",
+                     0, fails.get());
       }
     }
   }
 
-  public static class MockTrigger extends TriggerBase {
+
+
+      
+  private static final void waitForAndDiffTimestamps(final String label,
+                                                     final long minExpectedDelta,
+                                                     final TimeUnit minExpectedDeltaUnit,
+                                                     final BlockingQueue<Long> timestamps) {
+    try {
+      log.info(label + ": Waiting for 2 timestamps to be recorded");
+      Long firstTs = timestamps.poll(minExpectedDelta * 3, minExpectedDeltaUnit);
+      assertNotNull(label + ": Couldn't get first timestampe after max allowed polling", firstTs);
+      Long secondTs = timestamps.poll(minExpectedDelta * 3, minExpectedDeltaUnit);
+      assertNotNull(label + ": Couldn't get second timestampe after max allowed polling", secondTs);
+      
+      final long deltaInNanos = secondTs - firstTs;
+      final long minExpectedDeltaInNanos = minExpectedDeltaUnit.toNanos(minExpectedDelta);
+      assertTrue(label + ": Delta between timestamps ("+secondTs+"ns - "+firstTs+"ns = "+deltaInNanos+"ns) is not " +
+                 "at least as much as min expected delay: " + minExpectedDeltaInNanos + "ns",
+                 deltaInNanos >= minExpectedDeltaInNanos);
+    } catch (InterruptedException e) {
+      log.error(label + ": interupted", e);
+      fail(label + ": interupted:" + e.toString());
+    }
+  }
+  
+  private static abstract class MockTrigger extends TriggerBase {
 
     public MockTrigger(TriggerEventType eventType, String name) {
       super(eventType, name);
     }
 
     @Override
-    protected Map<String, Object> getState() {
+    protected Map<String, Object> getState() { 
       return Collections.emptyMap();
     }
 
     @Override
-    protected void setState(Map<String, Object> state) {
-
-    }
-
-    @Override
-    public void restoreState(AutoScaling.Trigger old) {
-
-    }
+    protected void setState(Map<String, Object> state) {  /* No-Op */ }
 
     @Override
-    public void run() {
-
-    }
+    public void restoreState(AutoScaling.Trigger old) { /* No-Op */ }
   }
 }