You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/05 17:03:28 UTC
[lucene-solr] 08/11: @482 Everything is whack. I forget everything
means everything. God,
I can't believe I have been this far and further before and then just
wandered away from it and forgot I did anything more than play with some
good resource usage and http2. I must have curated for so long. Even
refreshing on it a few times since and I still forget how much coverage the
whackness has.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 9c1bdc66e2a28403bb38f3615ad64c6d8fcde9eb
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Aug 1 11:41:53 2020 -0500
@482 Everything is whack. I forget everything means everything. God, I can't believe I have been this far and further before and then just wandered away from it and forgot I did anything more than play with some good resource usage and http2. I must have curated for so long. Even refreshing on it a few times since and I still forget how much coverage the whackness has.
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 24 +-
.../cloud/TestWaitForStateWithJettyShutdowns.java | 6 +-
.../org/apache/solr/common/ParWorkExecService.java | 18 +-
.../apache/solr/common/cloud/ZkStateReader.java | 327 +++++++++------------
4 files changed, 163 insertions(+), 212 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d335428..8cce948 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -34,6 +34,7 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -184,14 +185,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
/** Builds a new HttpSolrClient for use in recovery. Caller must close */
- private final HttpSolrClient buildRecoverySolrClient(final String leaderUrl) {
+ private final Http2SolrClient buildRecoverySolrClient(final String leaderUrl) {
// workaround for SOLR-13605: get the configured timeouts & set them directly
// (even though getRecoveryOnlyHttpClient() already has them set)
final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig();
- return (new HttpSolrClient.Builder(leaderUrl)
- .withConnectionTimeout(3)
- .withSocketTimeout(5)
- .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
+ return (new Http2SolrClient.Builder(leaderUrl)
+ .withHttpClient(cc.getUpdateShardHandler().getUpdateOnlyHttpClient())
.markInternalRequest()
).build();
}
@@ -339,7 +338,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
final private void commitOnLeader(String leaderUrl) throws SolrServerException,
IOException {
- try (HttpSolrClient client = buildRecoverySolrClient(leaderUrl)) {
+ try (Http2SolrClient client = buildRecoverySolrClient(leaderUrl)) {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, "terminal");
@@ -892,7 +891,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
return leaderReplica;
}
- try (HttpSolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
+ try (Http2SolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
SolrPingResponse resp = httpSolrClient.ping();
return leaderReplica;
} catch (IOException e) {
@@ -997,14 +996,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
int conflictWaitMs = zkController.getLeaderConflictResolveWait();
int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "100"));
- try (HttpSolrClient client = buildRecoverySolrClient(leaderBaseUrl)) {
- client.setSoTimeout(readTimeout);
- HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
- prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+ try (Http2SolrClient client = buildRecoverySolrClient(leaderBaseUrl)) {
+ client.request(prepCmd);
+ // nocommit
+// HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
+// prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);
- mrr.future.get();
+ // mrr.future.get();
}
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
index a0859f6..d5ddcbe 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -67,12 +67,8 @@ public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
log.info("Wait to confirm our node is fully shutdown");
cluster.waitForJettyToStop(nodeToStop);
- // now that we're confident that node has stoped, check if a waitForState
- // call will detect the missing replica -- shouldn't need long wait times (we know it's down)...
log.info("Now check if waitForState will recognize we already have the exepcted state");
- cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0));
-
-
+ cluster.waitForActiveCollection(col_name, 5000, TimeUnit.MILLISECONDS, 1, 0);
} finally {
cluster.shutdown();
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index def7c9b..6962ba1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -89,9 +89,9 @@ public class ParWorkExecService implements ExecutorService {
public <T> Future<T> doSubmit(Callable<T> callable, boolean requiresAnotherThread) {
- if (shutdown || terminated) {
- throw new RejectedExecutionException();
- }
+// if (shutdown || terminated) {
+// throw new RejectedExecutionException();
+// }
try {
if (!requiresAnotherThread) {
boolean success = checkLoad();
@@ -183,9 +183,9 @@ public class ParWorkExecService implements ExecutorService {
}
public Future<?> doSubmit(Runnable runnable, boolean requiresAnotherThread) {
- if (shutdown || terminated) {
- throw new RejectedExecutionException();
- }
+// if (shutdown || terminated) {
+// throw new RejectedExecutionException();
+// }
if (!requiresAnotherThread) {
boolean success = checkLoad();
if (success) {
@@ -248,9 +248,9 @@ public class ParWorkExecService implements ExecutorService {
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> collection)
throws InterruptedException {
- if (shutdown || terminated) {
- throw new RejectedExecutionException();
- }
+// if (shutdown || terminated) {
+// throw new RejectedExecutionException();
+// }
List<Future<T>> futures = new ArrayList<>(collection.size());
for (Callable c : collection) {
futures.add(submit(c));
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 50739c8..6beea1a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -213,14 +213,14 @@ public class ZkStateReader implements SolrCloseable {
private final Runnable securityNodeListener;
- private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(16, 0.75f, 5);
// named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
- private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<String, CollectionPropsWatcher> collectionPropsObservers = new ConcurrentHashMap<>(16, 0.75f, 5);
private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
- private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
+ private final ExecutorService notifications = ParWork.getExecutor();
private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
@@ -229,7 +229,7 @@ public class ZkStateReader implements SolrCloseable {
/**
* Used to submit notifications to Collection Properties watchers in order
**/
- private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("collectionPropsNotifications"));
+ private final ExecutorService collectionPropsNotifications = ParWork.getExecutor();
private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
@@ -496,8 +496,20 @@ public class ZkStateReader implements SolrCloseable {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
- collectionPropsObservers.forEach((k, v) -> {
- collectionPropsWatchers.computeIfAbsent(k, PropsWatcher::new).refreshAndWatch(true);
+ collectionPropsObservers.forEach((c, v) -> {
+ Stat stat = new Stat();
+ byte[] data = new byte[0];
+ try {
+ data = zkClient.getData(getCollectionPropsPath(c), new PropsWatcher(c), stat);
+ } catch (KeeperException e) {
+ log.error("KeeperException", e);
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e);
+ }
+
+ VersionedCollectionProps props = new VersionedCollectionProps(
+ stat.getVersion(), (Map<String,String>) fromJSON(data));
+ watchedCollectionProps.put(c, props);
});
}
@@ -944,7 +956,7 @@ public class ZkStateReader implements SolrCloseable {
public Replica getLeader(Set<String> liveNodes, DocCollection docCollection, String shard) {
Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
- if (replica != null && liveNodes.contains(replica.getNodeName())) {
+ if (replica != null && liveNodes.contains(replica.getNodeName()) && replica.getState() == Replica.State.ACTIVE) {
return replica;
}
return null;
@@ -1194,40 +1206,33 @@ public class ZkStateReader implements SolrCloseable {
* @return a map representing the key/value properties for the collection.
*/
public Map<String, String> getCollectionProperties(final String collection, long cacheForMillis) {
- synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
- Watcher watcher = null;
- if (cacheForMillis > 0) {
- watcher = collectionPropsWatchers.compute(collection,
- (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
- }
- VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
- boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
- long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
- Map<String, String> properties;
- if (haveUnexpiredProps) {
- properties = vprops.props;
- vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
- } else {
- try {
- VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
- properties = vcp.props;
- if (cacheForMillis > 0) {
- vcp.cacheUntilNs = untilNs;
- watchedCollectionProps.put(collection, vcp);
- } else {
- // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
- // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
- if (!collectionPropsObservers.containsKey(collection)) {
- watchedCollectionProps.remove(collection);
- }
+ VersionedCollectionProps properties = watchedCollectionProps.get(collection);
+
+ if (properties == null) {
+ synchronized (watchedCollectionProps) {
+ properties = watchedCollectionProps.get(collection);
+ if (properties == null) {
+ PropsWatcher propsWatcher = new PropsWatcher(collection);
+ // load it
+ Stat stat = new Stat();
+ try {
+ byte[] data = zkClient.getData(getCollectionPropsPath(collection), propsWatcher, stat);
+
+ VersionedCollectionProps props = new VersionedCollectionProps(
+ stat.getVersion(), (Map<String,String>) fromJSON(data));
+ watchedCollectionProps.put(collection, props);
+ properties = props;
+ } catch (KeeperException e) {
+ log.error("KeeperException", e);
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e);
}
- } catch (Exception e) {
- ParWork.propegateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
+
}
}
- return properties;
}
+
+ return properties.props;
}
private class VersionedCollectionProps {
@@ -1245,38 +1250,38 @@ public class ZkStateReader implements SolrCloseable {
return COLLECTIONS_ZKNODE + '/' + collection + '/' + COLLECTION_PROPS_ZKNODE;
}
- @SuppressWarnings("unchecked")
- private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
- final String znodePath = getCollectionPropsPath(collection);
- // lazy init cache cleaner once we know someone is using collection properties.
- if (collectionPropsCacheCleaner == null) {
- synchronized (this) { // There can be only one! :)
- if (collectionPropsCacheCleaner == null) {
- collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
- }
- }
- }
- while (true) {
- try {
- Stat stat = new Stat();
- byte[] data = zkClient.getData(znodePath, watcher, stat);
- return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
- } catch (ClassCastException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
- } catch (KeeperException.NoNodeException e) {
- if (watcher != null) {
- // Leave an exists watch in place in case a collectionprops.json is created later.
- Stat exists = zkClient.exists(znodePath, watcher);
- if (exists != null) {
- // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
- // Loop and try again.
- continue;
- }
- }
- return new VersionedCollectionProps(-1, EMPTY_MAP);
- }
- }
- }
+// @SuppressWarnings("unchecked")
+// private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
+// final String znodePath = getCollectionPropsPath(collection);
+// // lazy init cache cleaner once we know someone is using collection properties.
+// if (collectionPropsCacheCleaner == null) {
+// synchronized (this) { // There can be only one! :)
+// if (collectionPropsCacheCleaner == null) {
+// collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
+// }
+// }
+// }
+// while (true) {
+// try {
+// Stat stat = new Stat();
+// byte[] data = zkClient.getData(znodePath, watcher, stat);
+// return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
+// } catch (ClassCastException e) {
+// throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
+// } catch (KeeperException.NoNodeException e) {
+// if (watcher != null) {
+// // Leave an exists watch in place in case a collectionprops.json is created later.
+// Stat exists = zkClient.exists(znodePath, watcher);
+// if (exists != null) {
+// // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
+// // Loop and try again.
+// continue;
+// }
+// }
+// return new VersionedCollectionProps(-1, EMPTY_MAP);
+// }
+// }
+// }
/**
* Returns the content of /security.json from ZooKeeper as a Map
@@ -1425,11 +1430,6 @@ public class ZkStateReader implements SolrCloseable {
watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
}
- public PropsWatcher renew(long forMillis) {
- watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
- return this;
- }
-
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
@@ -1437,59 +1437,76 @@ public class ZkStateReader implements SolrCloseable {
return;
}
- boolean expired = System.nanoTime() > watchUntilNs;
- if (!collectionPropsObservers.containsKey(coll) && expired) {
- // No one can be notified of the change, we can ignore it and "unset" the watch
- log.debug("Ignoring property change for collection {}", coll);
- return;
- }
+ if (EventType.NodeDataChanged.equals(event.getType())) {
+ // load it
+ Stat stat = new Stat();
+ try {
+ byte[] data = zkClient.getData(getCollectionPropsPath(coll), this, stat);
- log.info("A collection property change: [{}] for collection [{}] has occurred - updating...",
- event, coll);
+ VersionedCollectionProps props = new VersionedCollectionProps(
+ stat.getVersion(), (Map<String,String>) fromJSON(data));
+ watchedCollectionProps.put(coll, props);
- refreshAndWatch(true);
+ try (ParWork work = new ParWork(this, true)) {
+ for (CollectionPropsWatcher observer : collectionPropsObservers.values()) {
+ work.collect(() -> {
+ observer.onStateChanged(props.props);
+ });
+ }
+ }
+
+ // Stat stat = new Stat();
+ // byte[] data = zkClient.getData(znodePath, watcher, stat);
+ // return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
+ } catch (KeeperException e) {
+ log.error("", e);
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e);
+ }
+ }
+
}
/**
* Refresh collection properties from ZK and leave a watch for future changes. Updates the properties in
* watchedCollectionProps with the results of the refresh. Optionally notifies watchers
*/
- void refreshAndWatch(boolean notifyWatchers) {
- try {
- synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
- VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
- Map<String, String> properties = vcp.props;
- VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
- if (existingVcp == null || // never called before, record what we found
- vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
- vcp.zkVersion == -1) { // node was deleted start over
- watchedCollectionProps.put(coll, vcp);
- if (notifyWatchers) {
- notifyPropsWatchers(coll, properties);
- }
- if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
-
- // We should not be caching a collection that has been deleted.
- watchedCollectionProps.remove(coll);
-
- // core ref counting not relevant here, don't need canRemove(), we just sent
- // a notification of an empty set of properties, no reason to watch what doesn't exist.
- collectionPropsObservers.remove(coll);
-
- // This is the one time we know it's safe to throw this out. We just failed to set the watch
- // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
- collectionPropsWatchers.remove(coll);
- }
- }
- }
- } catch (KeeperException e) {
- log.error("Lost collection property watcher for {} due to ZK error", coll, e);
- throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Lost collection property watcher for {} due to the thread being interrupted", coll, e);
- }
- }
+// void refreshAndWatch(boolean notifyWatchers) {
+// try {
+// synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
+// VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
+// Map<String, String> properties = vcp.props;
+// VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
+// if (existingVcp == null || // never called before, record what we found
+// vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
+// vcp.zkVersion == -1) { // node was deleted start over
+// watchedCollectionProps.put(coll, vcp);
+// if (notifyWatchers) {
+// notifyPropsWatchers(coll, properties);
+// }
+// if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
+//
+// // We should not be caching a collection that has been deleted.
+// watchedCollectionProps.remove(coll);
+//
+// // core ref counting not relevant here, don't need canRemove(), we just sent
+// // a notification of an empty set of properties, no reason to watch what doesn't exist.
+// collectionPropsObservers.remove(coll);
+//
+// // This is the one time we know it's safe to throw this out. We just failed to set the watch
+// // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
+// collectionPropsWatchers.remove(coll);
+// }
+// }
+// }
+// } catch (KeeperException e) {
+// log.error("Lost collection property watcher for {} due to ZK error", coll, e);
+// throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+// } catch (InterruptedException e) {
+// Thread.currentThread().interrupt();
+// log.error("Lost collection property watcher for {} due to the thread being interrupted", coll, e);
+// }
+// }
}
/**
@@ -1974,38 +1991,15 @@ public class ZkStateReader implements SolrCloseable {
return updated;
}
- public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
- AtomicBoolean watchSet = new AtomicBoolean(false);
- collectionPropsObservers.compute(collection, (k, v) -> {
- if (v == null) {
- v = new CollectionWatch<>();
- watchSet.set(true);
- }
- v.stateWatchers.add(propsWatcher);
- return v;
- });
-
- if (watchSet.get()) {
- collectionPropsWatchers.computeIfAbsent(collection, PropsWatcher::new).refreshAndWatch(false);
- }
+ public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
+ collectionPropsObservers.remove(collection);
}
- public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
- collectionPropsObservers.compute(collection, (k, v) -> {
- if (v == null)
- return null;
- v.stateWatchers.remove(watcher);
- if (v.canBeRemoved()) {
- // don't want this to happen in middle of other blocks that might add it back.
- synchronized (watchedCollectionProps) {
- watchedCollectionProps.remove(collection);
- }
- return null;
- }
- return v;
- });
+ public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
+ collectionPropsObservers.put(collection, propsWatcher);
}
+
public static class ConfigData {
public Map<String, Object> data;
public int version;
@@ -2239,45 +2233,6 @@ public class ZkStateReader implements SolrCloseable {
}
- private void notifyPropsWatchers(String collection, Map<String, String> properties) {
- try {
- collectionPropsNotifications.submit(new PropsNotification(collection, properties));
- } catch (RejectedExecutionException e) {
- if (!closed) {
- log.error("Couldn't run collection properties notifications for {}", collection, e);
- }
- }
- }
-
- private class PropsNotification implements Runnable {
-
- private final String collection;
- private final Map<String, String> collectionProperties;
- private final List<CollectionPropsWatcher> watchers = new ArrayList<>();
-
- private PropsNotification(String collection, Map<String, String> collectionProperties) {
- this.collection = collection;
- this.collectionProperties = collectionProperties;
- // guarantee delivery of notification regardless of what happens to collectionPropsObservers
- // while we wait our turn in the executor by capturing the list on creation.
- collectionPropsObservers.compute(collection, (k, v) -> {
- if (v == null)
- return null;
- watchers.addAll(v.stateWatchers);
- return v;
- });
- }
-
- @Override
- public void run() {
- for (CollectionPropsWatcher watcher : watchers) {
- if (watcher.onStateChanged(collectionProperties)) {
- removeCollectionPropsWatcher(collection, watcher);
- }
- }
- }
- }
-
private class CacheCleaner implements Runnable {
public void run() {
while (!Thread.interrupted()) {