You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:58:10 UTC

[lucene] 05/06: SOLR-14749: Move much of the singleton mgmgt into the utility class.

This is an automated email from the ASF dual-hosted git repository.

dweiss pushed a commit to branch jira/solr-14749-cluster-singleton
in repository https://gitbox.apache.org/repos/asf/lucene.git

commit e549845ee72b7d547713904d3217141ffacee22f
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Oct 19 13:40:53 2020 +0200

    SOLR-14749: Move much of the singleton mgmgt into the utility class.
---
 .../apache/solr/api/CustomContainerPlugins.java    |  49 ++-----
 .../src/java/org/apache/solr/cloud/Overseer.java   |  46 +-----
 .../org/apache/solr/core/ClusterSingletons.java    | 162 +++++++++++++++++++++
 .../java/org/apache/solr/core/CoreContainer.java   |  41 ++----
 .../test/org/apache/solr/cloud/OverseerTest.java   |   3 +-
 5 files changed, 186 insertions(+), 115 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
index 616fb9d..5f4d9a7 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
@@ -34,7 +34,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.lucene.util.ResourceLoaderAware;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.beans.PluginMeta;
-import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.annotation.JsonProperty;
@@ -57,9 +56,10 @@ import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
 import static org.apache.solr.common.util.Utils.makeMap;
 
 public class CustomContainerPlugins implements ClusterPropertiesListener, MapWriter {
-  private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
+
   private final List<PluginRegistryListener> listeners = new CopyOnWriteArrayList<>();
 
   final CoreContainer coreContainer;
@@ -124,7 +124,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
         ApiInfo apiInfo = currentPlugins.remove(e.getKey());
         if (apiInfo == null) continue;
         listeners.forEach(listener -> listener.deleted(apiInfo));
-        handleClusterSingleton(null, apiInfo);
         for (ApiHolder holder : apiInfo.holders) {
           Api old = containerApiBag.unregister(holder.api.getEndPoint().method()[0],
               getActualPath(apiInfo, holder.api.getEndPoint().path()[0]));
@@ -156,7 +155,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
           currentPlugins.put(e.getKey(), apiInfo);
           final ApiInfo apiInfoFinal = apiInfo;
           listeners.forEach(listener -> listener.added(apiInfoFinal));
-          handleClusterSingleton(apiInfo, null);
         } else {
           //this plugin is being updated
           ApiInfo old = currentPlugins.put(e.getKey(), apiInfo);
@@ -166,7 +164,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
           }
           final ApiInfo apiInfoFinal = apiInfo;
           listeners.forEach(listener -> listener.modified(old, apiInfoFinal));
-          handleClusterSingleton(apiInfo, old);
           if (old != null) {
             //this is an update of the plugin. But, it is possible that
             // some paths are remved in the newer version of the plugin
@@ -187,36 +184,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
     }
   }
 
-  private void handleClusterSingleton(ApiInfo newApiInfo, ApiInfo oldApiInfo) {
-    if (newApiInfo != null) {
-      // register new api
-      Object instance = newApiInfo.getInstance();
-      if (instance instanceof ClusterSingleton) {
-        ClusterSingleton singleton = (ClusterSingleton) instance;
-        coreContainer.getClusterSingletons().getSingletons().put(singleton.getName(), singleton);
-        // check to see if we should immediately start this singleton
-        if (coreContainer.getZkController() != null &&
-            coreContainer.getZkController().getOverseer() != null &&
-            !coreContainer.getZkController().getOverseer().isClosed()) {
-          try {
-            singleton.start();
-          } catch (Exception exc) {
-            log.warn("Exception starting ClusterSingleton {}: {}", newApiInfo, exc);
-          }
-        }
-      }
-    }
-    if (oldApiInfo != null) {
-      // stop & unregister the old api
-      Object instance = oldApiInfo.getInstance();
-      if (instance instanceof ClusterSingleton) {
-        ClusterSingleton singleton = (ClusterSingleton) instance;
-        singleton.stop();
-        coreContainer.getClusterSingletons().getSingletons().remove(singleton.getName());
-      }
-    }
-  }
-
   private static String getActualPath(ApiInfo apiInfo, String path) {
     path = path.replaceAll("\\$path-prefix", apiInfo.info.pathPrefix);
     path = path.replaceAll("\\$plugin-name", apiInfo.info.name);
@@ -329,7 +296,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
       }
 
       try {
-        List<Api> apis = AnnotatedApi.getApis(klas, null, false);
+        List<Api> apis = AnnotatedApi.getApis(klas, null, true);
         for (Object api : apis) {
           EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
           if (endPoint.path().length > 1 || endPoint.method().length > 1) {
@@ -381,7 +348,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
         }
       }
       this.holders = new ArrayList<>();
-      for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, false)) {
+      for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, true)) {
         holders.add(new ApiHolder((AnnotatedApi) api));
       }
     }
@@ -421,12 +388,18 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
     return null;
   }
 
-  interface PluginRegistryListener {
+  /**
+   * Listener for notifications about added / deleted / modified plugins.
+   */
+  public interface PluginRegistryListener {
 
+    /** Called when a new plugin is added. */
     void added(ApiInfo plugin);
 
+    /** Called when an existing plugin is deleted. */
     void deleted(ApiInfo plugin);
 
+    /** Called when an existing plugin is replaced. */
     void modified(ApiInfo old, ApiInfo replacement);
 
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4c206ae..fc0d0eb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,8 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 
 import org.apache.lucene.util.Version;
@@ -657,7 +655,7 @@ public class Overseer implements SolrCloseable {
       }
     });
 
-    startClusterSingletons();
+    getCoreContainer().getClusterSingletons().startClusterSingletons();
 
     assert ObjectReleaseTracker.track(this);
   }
@@ -781,50 +779,10 @@ public class Overseer implements SolrCloseable {
   /**
    * Start {@link ClusterSingleton} plugins when we become the leader.
    */
-  public void startClusterSingletons() {
-    CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
-    final Runnable initializer = () -> {
-      if (isClosed()) {
-        return;
-      }
-      try {
-        singletons.waitUntilReady(60, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        log.warn("Interrupted initialization of ClusterSingleton-s");
-        return;
-      } catch (TimeoutException te) {
-        log.warn("Timed out during initialization of ClusterSingleton-s");
-        return;
-      }
-      singletons.getSingletons().forEach((name, singleton) -> {
-        try {
-          singleton.start();
-        } catch (Exception e) {
-          log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
-        }
-      });
-    };
-    if (singletons.isReady()) {
-      // wait until all singleton-s are ready for the first startup
-      getCoreContainer().runAsync(initializer);
-    } else {
-      initializer.run();
-    }
-  }
 
   /**
    * Stop {@link ClusterSingleton} plugins when we lose leadership.
    */
-  private void stopClusterSingletons() {
-    CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
-    if (singletons == null) {
-      return;
-    }
-    singletons.getSingletons().forEach((name, singleton) -> {
-      singleton.stop();
-    });
-  }
-
   public Stats getStats() {
     return stats;
   }
@@ -866,7 +824,7 @@ public class Overseer implements SolrCloseable {
     }
     // stop singletons only on the leader
     if (!this.closed) {
-      stopClusterSingletons();
+      getCoreContainer().getClusterSingletons().stopClusterSingletons();
     }
     this.closed = true;
     doClose();
diff --git a/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
new file mode 100644
index 0000000..c929e83
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core;
+
+import org.apache.solr.api.CustomContainerPlugins;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Helper class to manage the initial registration of {@link ClusterSingleton} plugins and
+ * to track the changes in loaded plugins in {@link org.apache.solr.api.CustomContainerPlugins}.
+ */
+public class ClusterSingletons {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
+  private final Supplier<Boolean> runSingletons;
+  private final Consumer<Runnable> asyncRunner;
+  private final CustomContainerPlugins.PluginRegistryListener pluginListener;
+
+  // we use this latch to delay the initial startup of singletons, due to
+  // the leader election occurring in parallel with the rest of the load() method.
+  private final CountDownLatch readyLatch = new CountDownLatch(1);
+
+  /**
+   * Create a helper to manage singletons.
+   * @param runSingletons this function returns true when singletons should be running. It's
+   *                      used when adding or modifying existing plugins.
+   * @param asyncRunner async runner that will be used for starting up each singleton.
+   */
+  public ClusterSingletons(Supplier<Boolean> runSingletons, Consumer<Runnable> asyncRunner) {
+    this.runSingletons = runSingletons;
+    this.asyncRunner = asyncRunner;
+    pluginListener = new CustomContainerPlugins.PluginRegistryListener() {
+      @Override
+      public void added(CustomContainerPlugins.ApiInfo plugin) {
+        // register new api
+        Object instance = plugin.getInstance();
+        if (instance instanceof ClusterSingleton) {
+          ClusterSingleton singleton = (ClusterSingleton) instance;
+          singletonMap.put(singleton.getName(), singleton);
+          // check to see if we should immediately start this singleton
+          if (isReady() && runSingletons.get()) {
+            try {
+              singleton.start();
+            } catch (Exception exc) {
+              log.warn("Exception starting ClusterSingleton {}: {}", plugin, exc);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void deleted(CustomContainerPlugins.ApiInfo plugin) {
+        Object instance = plugin.getInstance();
+        if (instance instanceof ClusterSingleton) {
+          ClusterSingleton singleton = (ClusterSingleton) instance;
+          singleton.stop();
+          singletonMap.remove(singleton.getName());
+        }
+      }
+
+      @Override
+      public void modified(CustomContainerPlugins.ApiInfo old, CustomContainerPlugins.ApiInfo replacement) {
+        added(replacement);
+        deleted(old);
+      }
+    };
+  }
+
+  public CustomContainerPlugins.PluginRegistryListener getPluginRegistryListener() {
+    return pluginListener;
+  }
+
+  public Map<String, ClusterSingleton> getSingletons() {
+    return singletonMap;
+  }
+
+  public boolean isReady() {
+    return readyLatch.getCount() == 0;
+  }
+
+  public void setReady() {
+    readyLatch.countDown();
+  }
+
+  public void waitUntilReady(long timeout, TimeUnit timeUnit)
+      throws InterruptedException, TimeoutException {
+    boolean await = readyLatch.await(timeout, timeUnit);
+    if (!await) {
+      throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
+    }
+  }
+
+  /**
+   * Start singletons when the helper is ready and when it's supposed to start
+   * (as determined by {@link #runSingletons} function).
+   */
+  public void startClusterSingletons() {
+    final Runnable initializer = () -> {
+      if (!runSingletons.get()) {
+        return;
+      }
+      try {
+        waitUntilReady(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        log.warn("Interrupted initialization of ClusterSingleton-s");
+        return;
+      } catch (TimeoutException te) {
+        log.warn("Timed out during initialization of ClusterSingleton-s");
+        return;
+      }
+      singletonMap.forEach((name, singleton) -> {
+        if (!runSingletons.get()) {
+          return;
+        }
+        try {
+          singleton.start();
+        } catch (Exception e) {
+          log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+        }
+      });
+    };
+    if (isReady()) {
+      // wait until all singleton-s are ready for the first startup
+      asyncRunner.accept(initializer);
+    } else {
+      initializer.run();
+    }
+  }
+
+  public void stopClusterSingletons() {
+    singletonMap.forEach((name, singleton) -> {
+      singleton.stop();
+    });
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 36d5e0d..bdcd0b4 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -39,12 +39,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -172,33 +169,6 @@ public class CoreContainer {
     }
   }
 
-  public static class ClusterSingletons {
-    private Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
-    // we use this latch to delay the initial startup of singletons, due to
-    // the leader election occurring in parallel with the rest of the load() method.
-    private CountDownLatch readyLatch = new CountDownLatch(1);
-
-    public Map<String, ClusterSingleton> getSingletons() {
-      return singletonMap;
-    }
-
-    public boolean isReady() {
-      return readyLatch.getCount() > 0;
-    }
-
-    public void setReady() {
-      readyLatch.countDown();
-    }
-
-    public void waitUntilReady(long timeout, TimeUnit timeUnit)
-        throws InterruptedException, TimeoutException {
-      boolean await = readyLatch.await(timeout, timeUnit);
-      if (!await) {
-        throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
-      }
-    }
-  }
-
   private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
 
   /**
@@ -276,7 +246,11 @@ public class CoreContainer {
 
   private final ObjectCache objectCache = new ObjectCache();
 
-  private final ClusterSingletons clusterSingletons = new ClusterSingletons();
+  private final ClusterSingletons clusterSingletons = new ClusterSingletons(
+      () -> getZkController() != null &&
+          getZkController().getOverseer() != null &&
+          !getZkController().getOverseer().isClosed(),
+      (r) -> this.runAsync(r));
   private PackageStoreAPI packageStoreAPI;
   private PackageLoader packageLoader;
 
@@ -695,6 +669,8 @@ public class CoreContainer {
       loader.reloadLuceneSPI();
     }
 
+    customContainerPlugins.registerListener(clusterSingletons.getPluginRegistryListener());
+
     packageStoreAPI = new PackageStoreAPI(this);
     containerHandlers.getApiBag().registerObject(packageStoreAPI.readAPI);
     containerHandlers.getApiBag().registerObject(packageStoreAPI.writeAPI);
@@ -915,7 +891,8 @@ public class CoreContainer {
       containerHandlers.keySet().forEach(handlerName -> {
         SolrRequestHandler handler = containerHandlers.get(handlerName);
         if (handler instanceof ClusterSingleton) {
-          clusterSingletons.singletonMap.put(handlerName, (ClusterSingleton) handler);
+          ClusterSingleton singleton = (ClusterSingleton) handler;
+          clusterSingletons.getSingletons().put(singleton.getName(), singleton);
         }
       });
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 125a4cd..89fda3e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -67,6 +67,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CloudConfig;
+import org.apache.solr.core.ClusterSingletons;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrResourceLoader;
@@ -1422,7 +1423,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
         Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
     when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone);  // Allow retry on session expiry
     when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
-    CoreContainer.ClusterSingletons singletons = new CoreContainer.ClusterSingletons();
+    ClusterSingletons singletons = new ClusterSingletons(() -> true, r -> r.run());
     // don't wait for all singletons
     singletons.setReady();
     FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"), singletons);