You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by md...@apache.org on 2021/02/04 01:39:22 UTC

[lucene-solr] branch master updated: SOLR-15122 Replace sleeps with phaser await (#2291)

This is an automated email from the ASF dual-hosted git repository.

mdrob pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fccdfe  SOLR-15122 Replace sleeps with phaser await (#2291)
8fccdfe is described below

commit 8fccdfe3530cd556270af08e1b51f13f8a1d8eb2
Author: Mike Drob <md...@apache.org>
AuthorDate: Wed Feb 3 19:39:04 2021 -0600

    SOLR-15122 Replace sleeps with phaser await (#2291)
---
 .../impl/DelegatingClusterEventProducer.java       | 24 ++++++---
 .../impl/DelegatingPlacementPluginFactory.java     | 26 +++++++---
 .../cluster/events/ClusterEventProducerTest.java   | 51 +++++--------------
 .../impl/PlacementPluginIntegrationTest.java       | 59 ++++++----------------
 4 files changed, 64 insertions(+), 96 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
index df380f5..4853569 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Set;
+import java.util.concurrent.Phaser;
 
 /**
  * This implementation allows Solr to dynamically change the underlying implementation
@@ -40,7 +41,7 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
 
   private ClusterEventProducer delegate;
   // support for tests to make sure the update is completed
-  private volatile int version;
+  private volatile Phaser phaser;
 
   public DelegatingClusterEventProducer(CoreContainer cc) {
     super(cc);
@@ -56,6 +57,16 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
     super.close();
   }
 
+  /**
+   * A phaser that will advance phases every time {@link #setDelegate(ClusterEventProducer)} is called.
+   * Useful for allowing tests to know when a new delegate is finished getting set.
+   */
+  @VisibleForTesting
+  public void setDelegationPhaser(Phaser phaser) {
+    phaser.register();
+    this.phaser = phaser;
+  }
+
   public void setDelegate(ClusterEventProducer newDelegate) {
     if (log.isDebugEnabled()) {
       log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
@@ -90,7 +101,11 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
         log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
       }
     }
-    this.version++;
+    Phaser localPhaser = phaser; // volatile read
+    if (localPhaser != null) {
+      assert localPhaser.getRegisteredParties() == 1;
+      localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time
+    }
   }
 
   @Override
@@ -142,9 +157,4 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
     delegate.stop();
     state = delegate.getState();
   }
-
-  @VisibleForTesting
-  public int getVersion() {
-    return version;
-  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/DelegatingPlacementPluginFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/DelegatingPlacementPluginFactory.java
index 7ebb91e..0b813ad 100644
--- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/DelegatingPlacementPluginFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/DelegatingPlacementPluginFactory.java
@@ -21,14 +21,15 @@ import org.apache.solr.cluster.placement.PlacementPlugin;
 import org.apache.solr.cluster.placement.PlacementPluginConfig;
 import org.apache.solr.cluster.placement.PlacementPluginFactory;
 
+import java.util.concurrent.Phaser;
+
 /**
  * Helper class to support dynamic reloading of plugin implementations.
  */
 public final class DelegatingPlacementPluginFactory implements PlacementPluginFactory<PlacementPluginFactory.NoConfig> {
-
   private volatile PlacementPluginFactory<? extends PlacementPluginConfig> delegate;
   // support for tests to make sure the update is completed
-  private volatile int version;
+  private volatile Phaser phaser;
 
   @Override
   public PlacementPlugin createPluginInstance() {
@@ -39,18 +40,27 @@ public final class DelegatingPlacementPluginFactory implements PlacementPluginFa
     }
   }
 
+  /**
+   * A phaser that will advance phases every time {@link #setDelegate(PlacementPluginFactory)} is called.
+   * Useful for allowing tests to know when a new delegate is finished getting set.
+   */
+  @VisibleForTesting
+  public void setDelegationPhaser(Phaser phaser) {
+    phaser.register();
+    this.phaser = phaser;
+  }
+
   public void setDelegate(PlacementPluginFactory<? extends PlacementPluginConfig> delegate) {
     this.delegate = delegate;
-    this.version++;
+    Phaser localPhaser = phaser; // volatile read
+    if (localPhaser != null) {
+      assert localPhaser.getRegisteredParties() == 1;
+      localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time
+    }
   }
 
   @VisibleForTesting
   public PlacementPluginFactory<? extends PlacementPluginConfig> getDelegate() {
     return delegate;
   }
-
-  @VisibleForTesting
-  public int getVersion() {
-    return version;
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
index d751b44..0ac7c4f 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -23,15 +23,12 @@ import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.request.beans.PluginMeta;
 import org.apache.solr.client.solrj.response.V2Response;
 import org.apache.solr.cloud.ClusterSingleton;
-import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
 import org.apache.solr.cluster.events.impl.DelegatingClusterEventProducer;
 import org.apache.solr.common.cloud.ClusterProperties;
-import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.LogLevel;
-import org.apache.solr.util.TimeOut;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -47,8 +44,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static java.util.Collections.singletonMap;
 import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
@@ -59,9 +56,8 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
  */
 @LogLevel("org.apache.solr.cluster.events=DEBUG")
 public class ClusterEventProducerTest extends SolrCloudTestCase {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private AllEventsListener eventsListener;
+  private Phaser phaser;
 
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -77,6 +73,12 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     cluster.deleteAllCollections();
     eventsListener = new AllEventsListener();
     cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
+    ClusterEventProducer clusterEventProducer = cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer();
+    assertTrue("not a delegating producer? " + clusterEventProducer.getClass(),
+            clusterEventProducer instanceof DelegatingClusterEventProducer);
+    DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer) clusterEventProducer;
+    phaser = new Phaser();
+    wrapper.setDelegationPhaser(phaser);
   }
 
   @After
@@ -102,7 +104,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
 
   @Test
   public void testEvents() throws Exception {
-    int version = waitForVersionChange(-1, 10);
+    int version = phaser.getPhase();
 
     PluginMeta plugin = new PluginMeta();
     plugin.klass = DefaultClusterEventProducer.class.getName();
@@ -114,7 +116,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     V2Response rsp = req.process(cluster.getSolrClient());
     assertEquals(0, rsp.getStatus());
 
-    version = waitForVersionChange(version, 10);
+    phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
 
     // NODES_DOWN
 
@@ -281,7 +283,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
 
   @Test
   public void testListenerPlugins() throws Exception {
-    int version = waitForVersionChange(-1, 10);
+    int version = phaser.getPhase();
 
     PluginMeta plugin = new PluginMeta();
     plugin.klass = DefaultClusterEventProducer.class.getName();
@@ -292,7 +294,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
         .build();
     V2Response rsp = req.process(cluster.getSolrClient());
     assertEquals(0, rsp.getStatus());
-    version = waitForVersionChange(-1, 10);
+    version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
 
     plugin = new PluginMeta();
     plugin.name = "testplugin";
@@ -350,7 +352,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
         .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
         .build();
     req.process(cluster.getSolrClient());
-    version = waitForVersionChange(-1, 10);
+    version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
 
     dummyEventLatch = new CountDownLatch(1);
     lastEvent = null;
@@ -371,7 +373,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
         .build();
     rsp = req.process(cluster.getSolrClient());
     assertEquals(0, rsp.getStatus());
-    version = waitForVersionChange(-1, 10);
+    phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
 
     dummyEventLatch = new CountDownLatch(1);
     lastEvent = null;
@@ -384,29 +386,4 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
     assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
   }
-
-  private int waitForVersionChange(int currentVersion, int timeoutSec) throws Exception {
-    TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-    Overseer overseer = cluster.getOpenOverseer();
-    if (overseer == null) {
-      throw new Exception("no overseer");
-    }
-    ClusterEventProducer clusterEventProducer = overseer.getCoreContainer().getClusterEventProducer();
-    assertTrue("not a delegating producer? " + clusterEventProducer.getClass(),
-        clusterEventProducer instanceof DelegatingClusterEventProducer);
-    DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer) clusterEventProducer;
-    while (!timeout.hasTimedOut()) {
-      int newVersion = wrapper.getVersion();
-      if (newVersion < currentVersion) {
-        throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
-            " newVersion=" + newVersion);
-      } else if (currentVersion < newVersion) {
-        log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
-        return newVersion;
-      }
-      timeout.sleep(200);
-    }
-    throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
-
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
index e699af6..dd89bf9 100644
--- a/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java
@@ -41,24 +41,20 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cluster.placement.plugins.MinimizeCoresPlacementFactory;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.util.LogLevel;
-import org.apache.solr.util.TimeOut;
 
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -70,8 +66,6 @@ import static java.util.Collections.singletonMap;
  */
 @LogLevel("org.apache.solr.cluster.placement.impl=DEBUG")
 public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
   private static final String COLLECTION = PlacementPluginIntegrationTest.class.getSimpleName() + "_collection";
 
   private static SolrCloudManager cloudManager;
@@ -145,14 +139,15 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
   }
 
   @Test
-  @SuppressWarnings("unchecked")
   public void testDynamicReconfiguration() throws Exception {
     PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
     assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
     DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
+    Phaser phaser = new Phaser();
+    wrapper.setDelegationPhaser(phaser);
 
-    int version = wrapper.getVersion();
-    log.debug("--initial version={}", version);
+    int version = phaser.getPhase();
+    assertTrue("wrong version " + version, version > -1);
 
     PluginMeta plugin = new PluginMeta();
     plugin.name = PlacementPluginFactory.PLUGIN_NAME;
@@ -164,9 +159,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
         .build();
     req.process(cluster.getSolrClient());
 
-    version = waitForVersionChange(version, wrapper, 10);
-
-    assertTrue("wrong version " + version, version > 0);
+    version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
     PlacementPluginFactory<? extends PlacementPluginConfig> factory = wrapper.getDelegate();
     assertTrue("wrong type " + factory.getClass().getName(), factory instanceof MinimizeCoresPlacementFactory);
 
@@ -180,7 +173,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
         .build();
     req.process(cluster.getSolrClient());
 
-    version = waitForVersionChange(version, wrapper, 10);
+    version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
 
     factory = wrapper.getDelegate();
     assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
@@ -197,7 +190,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
         .build();
     req.process(cluster.getSolrClient());
 
-    version = waitForVersionChange(version, wrapper, 10);
+    version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
     factory = wrapper.getDelegate();
     assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
     config = ((AffinityPlacementFactory) factory).getConfig();
@@ -212,14 +205,8 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
         .withPayload(singletonMap("add", plugin))
         .build();
     req.process(cluster.getSolrClient());
-    try {
-      int newVersion = waitForVersionChange(version, wrapper, 5);
-      if (newVersion != version) {
-        fail("factory configuration updated but plugin name was wrong: " + plugin);
-      }
-    } catch (TimeoutException te) {
-      // expected
-    }
+    final int oldVersion = version;
+    expectThrows(TimeoutException.class, () -> phaser.awaitAdvanceInterruptibly(oldVersion, 5, TimeUnit.SECONDS));
     // remove plugin
     req = new V2Request.Builder("/cluster/plugin")
         .forceV2(true)
@@ -227,7 +214,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
         .withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}")
         .build();
     req.process(cluster.getSolrClient());
-    waitForVersionChange(version, wrapper, 10);
+    phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
     factory = wrapper.getDelegate();
     assertNull("no factory should be present", factory);
   }
@@ -237,9 +224,10 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
     PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
     assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
     DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
+    Phaser phaser = new Phaser();
+    wrapper.setDelegationPhaser(phaser);
 
-    int version = wrapper.getVersion();
-    log.debug("--initial version={}", version);
+    int version = phaser.getPhase();
 
     Set<String> nodeSet = new HashSet<>();
     for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
@@ -261,7 +249,7 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
         .build();
     req.process(cluster.getSolrClient());
 
-    version = waitForVersionChange(version, wrapper, 10);
+    phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
 
     CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(SECONDARY_COLLECTION, "conf", 1, 3)
         .process(cluster.getSolrClient());
@@ -398,21 +386,4 @@ public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
       });
     });
   }
-
-  private int waitForVersionChange(int currentVersion, DelegatingPlacementPluginFactory wrapper, int timeoutSec) throws Exception {
-    TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-
-    while (!timeout.hasTimedOut()) {
-      int newVersion = wrapper.getVersion();
-      if (newVersion < currentVersion) {
-        throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
-            " newVersion=" + newVersion);
-      } else if (currentVersion < newVersion) {
-        log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
-        return newVersion;
-      }
-      timeout.sleep(200);
-    }
-    throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
-  }
 }