You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2021/03/10 09:58:10 UTC
[lucene] 05/06: SOLR-14749: Move much of the singleton mgmgt into
the utility class.
This is an automated email from the ASF dual-hosted git repository.
dweiss pushed a commit to branch jira/solr-14749-cluster-singleton
in repository https://gitbox.apache.org/repos/asf/lucene.git
commit e549845ee72b7d547713904d3217141ffacee22f
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Oct 19 13:40:53 2020 +0200
SOLR-14749: Move much of the singleton mgmgt into the utility class.
---
.../apache/solr/api/CustomContainerPlugins.java | 49 ++-----
.../src/java/org/apache/solr/cloud/Overseer.java | 46 +-----
.../org/apache/solr/core/ClusterSingletons.java | 162 +++++++++++++++++++++
.../java/org/apache/solr/core/CoreContainer.java | 41 ++----
.../test/org/apache/solr/cloud/OverseerTest.java | 3 +-
5 files changed, 186 insertions(+), 115 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
index 616fb9d..5f4d9a7 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
@@ -34,7 +34,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.lucene.util.ResourceLoaderAware;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.PluginMeta;
-import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.JsonProperty;
@@ -57,9 +56,10 @@ import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
import static org.apache.solr.common.util.Utils.makeMap;
public class CustomContainerPlugins implements ClusterPropertiesListener, MapWriter {
- private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
+
private final List<PluginRegistryListener> listeners = new CopyOnWriteArrayList<>();
final CoreContainer coreContainer;
@@ -124,7 +124,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
ApiInfo apiInfo = currentPlugins.remove(e.getKey());
if (apiInfo == null) continue;
listeners.forEach(listener -> listener.deleted(apiInfo));
- handleClusterSingleton(null, apiInfo);
for (ApiHolder holder : apiInfo.holders) {
Api old = containerApiBag.unregister(holder.api.getEndPoint().method()[0],
getActualPath(apiInfo, holder.api.getEndPoint().path()[0]));
@@ -156,7 +155,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
currentPlugins.put(e.getKey(), apiInfo);
final ApiInfo apiInfoFinal = apiInfo;
listeners.forEach(listener -> listener.added(apiInfoFinal));
- handleClusterSingleton(apiInfo, null);
} else {
//this plugin is being updated
ApiInfo old = currentPlugins.put(e.getKey(), apiInfo);
@@ -166,7 +164,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
}
final ApiInfo apiInfoFinal = apiInfo;
listeners.forEach(listener -> listener.modified(old, apiInfoFinal));
- handleClusterSingleton(apiInfo, old);
if (old != null) {
//this is an update of the plugin. But, it is possible that
// some paths are remved in the newer version of the plugin
@@ -187,36 +184,6 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
}
}
- private void handleClusterSingleton(ApiInfo newApiInfo, ApiInfo oldApiInfo) {
- if (newApiInfo != null) {
- // register new api
- Object instance = newApiInfo.getInstance();
- if (instance instanceof ClusterSingleton) {
- ClusterSingleton singleton = (ClusterSingleton) instance;
- coreContainer.getClusterSingletons().getSingletons().put(singleton.getName(), singleton);
- // check to see if we should immediately start this singleton
- if (coreContainer.getZkController() != null &&
- coreContainer.getZkController().getOverseer() != null &&
- !coreContainer.getZkController().getOverseer().isClosed()) {
- try {
- singleton.start();
- } catch (Exception exc) {
- log.warn("Exception starting ClusterSingleton {}: {}", newApiInfo, exc);
- }
- }
- }
- }
- if (oldApiInfo != null) {
- // stop & unregister the old api
- Object instance = oldApiInfo.getInstance();
- if (instance instanceof ClusterSingleton) {
- ClusterSingleton singleton = (ClusterSingleton) instance;
- singleton.stop();
- coreContainer.getClusterSingletons().getSingletons().remove(singleton.getName());
- }
- }
- }
-
private static String getActualPath(ApiInfo apiInfo, String path) {
path = path.replaceAll("\\$path-prefix", apiInfo.info.pathPrefix);
path = path.replaceAll("\\$plugin-name", apiInfo.info.name);
@@ -329,7 +296,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
}
try {
- List<Api> apis = AnnotatedApi.getApis(klas, null, false);
+ List<Api> apis = AnnotatedApi.getApis(klas, null, true);
for (Object api : apis) {
EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
if (endPoint.path().length > 1 || endPoint.method().length > 1) {
@@ -381,7 +348,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
}
}
this.holders = new ArrayList<>();
- for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, false)) {
+ for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, true)) {
holders.add(new ApiHolder((AnnotatedApi) api));
}
}
@@ -421,12 +388,18 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
return null;
}
- interface PluginRegistryListener {
+ /**
+ * Listener for notifications about added / deleted / modified plugins.
+ */
+ public interface PluginRegistryListener {
+ /** Called when a new plugin is added. */
void added(ApiInfo plugin);
+ /** Called when an existing plugin is deleted. */
void deleted(ApiInfo plugin);
+ /** Called when an existing plugin is replaced. */
void modified(ApiInfo old, ApiInfo replacement);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4c206ae..fc0d0eb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,8 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
@@ -657,7 +655,7 @@ public class Overseer implements SolrCloseable {
}
});
- startClusterSingletons();
+ getCoreContainer().getClusterSingletons().startClusterSingletons();
assert ObjectReleaseTracker.track(this);
}
@@ -781,50 +779,10 @@ public class Overseer implements SolrCloseable {
/**
* Start {@link ClusterSingleton} plugins when we become the leader.
*/
- public void startClusterSingletons() {
- CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
- final Runnable initializer = () -> {
- if (isClosed()) {
- return;
- }
- try {
- singletons.waitUntilReady(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Interrupted initialization of ClusterSingleton-s");
- return;
- } catch (TimeoutException te) {
- log.warn("Timed out during initialization of ClusterSingleton-s");
- return;
- }
- singletons.getSingletons().forEach((name, singleton) -> {
- try {
- singleton.start();
- } catch (Exception e) {
- log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
- }
- });
- };
- if (singletons.isReady()) {
- // wait until all singleton-s are ready for the first startup
- getCoreContainer().runAsync(initializer);
- } else {
- initializer.run();
- }
- }
/**
* Stop {@link ClusterSingleton} plugins when we lose leadership.
*/
- private void stopClusterSingletons() {
- CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
- if (singletons == null) {
- return;
- }
- singletons.getSingletons().forEach((name, singleton) -> {
- singleton.stop();
- });
- }
-
public Stats getStats() {
return stats;
}
@@ -866,7 +824,7 @@ public class Overseer implements SolrCloseable {
}
// stop singletons only on the leader
if (!this.closed) {
- stopClusterSingletons();
+ getCoreContainer().getClusterSingletons().stopClusterSingletons();
}
this.closed = true;
doClose();
diff --git a/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
new file mode 100644
index 0000000..c929e83
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.core;
+
+import org.apache.solr.api.CustomContainerPlugins;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Helper class to manage the initial registration of {@link ClusterSingleton} plugins and
+ * to track the changes in loaded plugins in {@link org.apache.solr.api.CustomContainerPlugins}.
+ */
+public class ClusterSingletons {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
+ private final Supplier<Boolean> runSingletons;
+ private final Consumer<Runnable> asyncRunner;
+ private final CustomContainerPlugins.PluginRegistryListener pluginListener;
+
+ // we use this latch to delay the initial startup of singletons, due to
+ // the leader election occurring in parallel with the rest of the load() method.
+ private final CountDownLatch readyLatch = new CountDownLatch(1);
+
+ /**
+ * Create a helper to manage singletons.
+ * @param runSingletons this function returns true when singletons should be running. It's
+ * used when adding or modifying existing plugins.
+ * @param asyncRunner async runner that will be used for starting up each singleton.
+ */
+ public ClusterSingletons(Supplier<Boolean> runSingletons, Consumer<Runnable> asyncRunner) {
+ this.runSingletons = runSingletons;
+ this.asyncRunner = asyncRunner;
+ pluginListener = new CustomContainerPlugins.PluginRegistryListener() {
+ @Override
+ public void added(CustomContainerPlugins.ApiInfo plugin) {
+ // register new api
+ Object instance = plugin.getInstance();
+ if (instance instanceof ClusterSingleton) {
+ ClusterSingleton singleton = (ClusterSingleton) instance;
+ singletonMap.put(singleton.getName(), singleton);
+ // check to see if we should immediately start this singleton
+ if (isReady() && runSingletons.get()) {
+ try {
+ singleton.start();
+ } catch (Exception exc) {
+ log.warn("Exception starting ClusterSingleton {}: {}", plugin, exc);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void deleted(CustomContainerPlugins.ApiInfo plugin) {
+ Object instance = plugin.getInstance();
+ if (instance instanceof ClusterSingleton) {
+ ClusterSingleton singleton = (ClusterSingleton) instance;
+ singleton.stop();
+ singletonMap.remove(singleton.getName());
+ }
+ }
+
+ @Override
+ public void modified(CustomContainerPlugins.ApiInfo old, CustomContainerPlugins.ApiInfo replacement) {
+ added(replacement);
+ deleted(old);
+ }
+ };
+ }
+
+ public CustomContainerPlugins.PluginRegistryListener getPluginRegistryListener() {
+ return pluginListener;
+ }
+
+ public Map<String, ClusterSingleton> getSingletons() {
+ return singletonMap;
+ }
+
+ public boolean isReady() {
+ return readyLatch.getCount() == 0;
+ }
+
+ public void setReady() {
+ readyLatch.countDown();
+ }
+
+ public void waitUntilReady(long timeout, TimeUnit timeUnit)
+ throws InterruptedException, TimeoutException {
+ boolean await = readyLatch.await(timeout, timeUnit);
+ if (!await) {
+ throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
+ }
+ }
+
+ /**
+ * Start singletons when the helper is ready and when it's supposed to start
+ * (as determined by {@link #runSingletons} function).
+ */
+ public void startClusterSingletons() {
+ final Runnable initializer = () -> {
+ if (!runSingletons.get()) {
+ return;
+ }
+ try {
+ waitUntilReady(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted initialization of ClusterSingleton-s");
+ return;
+ } catch (TimeoutException te) {
+ log.warn("Timed out during initialization of ClusterSingleton-s");
+ return;
+ }
+ singletonMap.forEach((name, singleton) -> {
+ if (!runSingletons.get()) {
+ return;
+ }
+ try {
+ singleton.start();
+ } catch (Exception e) {
+ log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+ }
+ });
+ };
+ if (isReady()) {
+ // wait until all singleton-s are ready for the first startup
+ asyncRunner.accept(initializer);
+ } else {
+ initializer.run();
+ }
+ }
+
+ public void stopClusterSingletons() {
+ singletonMap.forEach((name, singleton) -> {
+ singleton.stop();
+ });
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 36d5e0d..bdcd0b4 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -39,12 +39,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
@@ -172,33 +169,6 @@ public class CoreContainer {
}
}
- public static class ClusterSingletons {
- private Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
- // we use this latch to delay the initial startup of singletons, due to
- // the leader election occurring in parallel with the rest of the load() method.
- private CountDownLatch readyLatch = new CountDownLatch(1);
-
- public Map<String, ClusterSingleton> getSingletons() {
- return singletonMap;
- }
-
- public boolean isReady() {
- return readyLatch.getCount() > 0;
- }
-
- public void setReady() {
- readyLatch.countDown();
- }
-
- public void waitUntilReady(long timeout, TimeUnit timeUnit)
- throws InterruptedException, TimeoutException {
- boolean await = readyLatch.await(timeout, timeUnit);
- if (!await) {
- throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
- }
- }
- }
-
private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
/**
@@ -276,7 +246,11 @@ public class CoreContainer {
private final ObjectCache objectCache = new ObjectCache();
- private final ClusterSingletons clusterSingletons = new ClusterSingletons();
+ private final ClusterSingletons clusterSingletons = new ClusterSingletons(
+ () -> getZkController() != null &&
+ getZkController().getOverseer() != null &&
+ !getZkController().getOverseer().isClosed(),
+ (r) -> this.runAsync(r));
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@@ -695,6 +669,8 @@ public class CoreContainer {
loader.reloadLuceneSPI();
}
+ customContainerPlugins.registerListener(clusterSingletons.getPluginRegistryListener());
+
packageStoreAPI = new PackageStoreAPI(this);
containerHandlers.getApiBag().registerObject(packageStoreAPI.readAPI);
containerHandlers.getApiBag().registerObject(packageStoreAPI.writeAPI);
@@ -915,7 +891,8 @@ public class CoreContainer {
containerHandlers.keySet().forEach(handlerName -> {
SolrRequestHandler handler = containerHandlers.get(handlerName);
if (handler instanceof ClusterSingleton) {
- clusterSingletons.singletonMap.put(handlerName, (ClusterSingleton) handler);
+ ClusterSingleton singleton = (ClusterSingleton) handler;
+ clusterSingletons.getSingletons().put(singleton.getName(), singleton);
}
});
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 125a4cd..89fda3e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -67,6 +67,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
+import org.apache.solr.core.ClusterSingletons;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrResourceLoader;
@@ -1422,7 +1423,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone); // Allow retry on session expiry
when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
- CoreContainer.ClusterSingletons singletons = new CoreContainer.ClusterSingletons();
+ ClusterSingletons singletons = new ClusterSingletons(() -> true, r -> r.run());
// don't wait for all singletons
singletons.setReady();
FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"), singletons);