You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2022/09/16 03:12:38 UTC
[solr] branch main updated: refactor out onReconnect into a method
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new d3a6de0284a refactor out onReconnect into a method
d3a6de0284a is described below
commit d3a6de0284a295994385bf1d061a9bd221044b20
Author: Noble Paul <no...@gmail.com>
AuthorDate: Fri Sep 16 13:12:27 2022 +1000
refactor out onReconnect into a method
---
.../java/org/apache/solr/cloud/ZkController.java | 245 ++++++++++-----------
1 file changed, 118 insertions(+), 127 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 7126ee7cba2..1307abfada8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -73,7 +73,6 @@ import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.StringUtils;
-import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
@@ -382,133 +381,15 @@ public class ZkController implements Closeable {
clientTimeout,
zkClientConnectTimeout,
strat,
- // on reconnect, reload cloud info
- new OnReconnect() {
-
- @Override
- public void command() throws SessionExpiredException {
- log.info(
- "ZooKeeper session re-connected ... refreshing core states after session expiration.");
- clearZkCollectionTerms();
- try {
- // recreate our watchers first so that they exist even on any problems below
- zkStateReader.createClusterStateWatchersAndUpdate();
-
- // this is troublesome - we dont want to kill anything the old
- // leader accepted
- // though I guess sync will likely get those updates back? But
- // only if
- // he is involved in the sync, and he certainly may not be
- // ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
- // we need to create all of our lost watches
-
- // seems we dont need to do this again...
- // Overseer.createClientNodes(zkClient, getNodeName());
-
- // start the overseer first as following code may need it's processing
- if (!zkRunOnly) {
- ElectionContext context =
- new OverseerElectionContext(zkClient, overseer, getNodeName());
-
- ElectionContext prevContext = overseerElector.getContext();
- if (prevContext != null) {
- prevContext.cancelElection();
- prevContext.close();
- }
-
- overseerElector.setup(context);
-
- if (cc.nodeRoles.isOverseerAllowedOrPreferred()) {
- overseerElector.joinElection(context, true);
- }
- }
-
- cc.cancelCoreRecoveries();
-
- try {
- registerAllCoresAsDown(descriptorsSupplier, false);
- } catch (SessionExpiredException e) {
- // zk has to reconnect and this will all be tried again
- throw e;
- } catch (Exception e) {
- // this is really best effort - in case of races or failure cases where we now
- // need to be the leader, if anything fails, just continue
- log.warn("Exception while trying to register all cores as DOWN", e);
- }
-
- // we have to register as live first to pick up docs in the buffer
- createEphemeralLiveNode();
-
- List<CoreDescriptor> descriptors = descriptorsSupplier.get();
- // re register all descriptors
- ExecutorService executorService =
- (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
- if (descriptors != null) {
- for (CoreDescriptor descriptor : descriptors) {
- // TODO: we need to think carefully about what happens when it was a leader
- // that was expired - as well as what to do about leaders/overseers with
- // connection loss
- try {
- // unload solrcores that have been 'failed over'
- throwErrorIfReplicaReplaced(descriptor);
-
- if (executorService != null) {
- executorService.submit(new RegisterCoreAsync(descriptor, true, true));
- } else {
- register(descriptor.getName(), descriptor, true, true, false);
- }
- } catch (Exception e) {
- SolrException.log(log, "Error registering SolrCore", e);
- }
- }
- }
-
- // notify any other objects that need to know when the session was re-connected
- HashSet<OnReconnect> clonedListeners;
- synchronized (reconnectListeners) {
- clonedListeners = (HashSet<OnReconnect>) reconnectListeners.clone();
- }
- // the OnReconnect operation can be expensive per listener, so do that async in
- // the background
- for (OnReconnect listener : clonedListeners) {
- try {
- if (executorService != null) {
- executorService.submit(new OnReconnectNotifyAsync(listener));
- } else {
- listener.command();
- }
- } catch (Exception exc) {
- // not much we can do here other than warn in the log
- log.warn(
- "Error when notifying OnReconnect listener {} after session re-connected.",
- listener,
- exc);
- }
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (SessionExpiredException e) {
- throw e;
- } catch (Exception e) {
- SolrException.log(log, "", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- }
- },
- new BeforeReconnect() {
-
- @Override
- public void command() {
- try {
- ZkController.this.overseer.close();
- } catch (Exception e) {
- log.error("Error trying to stop any Overseer threads", e);
- }
- closeOutstandingElections(descriptorsSupplier);
- markAllAsNotLeader(descriptorsSupplier);
+ () -> onReconnect(descriptorsSupplier),
+ () -> {
+ try {
+ this.overseer.close();
+ } catch (Exception e) {
+ log.error("Error trying to stop any Overseer threads", e);
}
+ closeOutstandingElections(descriptorsSupplier);
+ markAllAsNotLeader(descriptorsSupplier);
},
zkACLProvider,
cc::isShutDown);
@@ -544,6 +425,116 @@ public class ZkController implements Closeable {
assert ObjectReleaseTracker.track(this);
}
+ private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier)
+ throws SessionExpiredException {
+ // on reconnect, reload cloud info
+ log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
+ clearZkCollectionTerms();
+ try {
+ // recreate our watchers first so that they exist even on any problems below
+ zkStateReader.createClusterStateWatchersAndUpdate();
+
+ // this is troublesome - we don't want to kill anything the old
+ // leader accepted
+ // though I guess sync will likely get those updates back? But
+ // only if
+ // he is involved in the sync, and he certainly may not be
+ // ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
+ // we need to create all of our lost watches
+
+ // seems we don't need to do this again...
+ // Overseer.createClientNodes(zkClient, getNodeName());
+
+ // start the overseer first as following code may need it's processing
+ if (!zkRunOnly) {
+ ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
+
+ ElectionContext prevContext = overseerElector.getContext();
+ if (prevContext != null) {
+ prevContext.cancelElection();
+ prevContext.close();
+ }
+
+ overseerElector.setup(context);
+
+ if (cc.nodeRoles.isOverseerAllowedOrPreferred()) {
+ overseerElector.joinElection(context, true);
+ }
+ }
+
+ cc.cancelCoreRecoveries();
+
+ try {
+ registerAllCoresAsDown(descriptorsSupplier, false);
+ } catch (SessionExpiredException e) {
+ // zk has to reconnect and this will all be tried again
+ throw e;
+ } catch (Exception e) {
+ // this is really best effort - in case of races or failure cases where we now
+ // need to be the leader, if anything fails, just continue
+ log.warn("Exception while trying to register all cores as DOWN", e);
+ }
+
+ // we have to register as live first to pick up docs in the buffer
+ createEphemeralLiveNode();
+
+ List<CoreDescriptor> descriptors = descriptorsSupplier.get();
+ // re register all descriptors
+ ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ // TODO: we need to think carefully about what happens when it was a leader
+ // that was expired - as well as what to do about leaders/overseers with
+ // connection loss
+ try {
+ // unload solr cores that have been 'failed over'
+ throwErrorIfReplicaReplaced(descriptor);
+
+ if (executorService != null) {
+ executorService.submit(new RegisterCoreAsync(descriptor, true, true));
+ } else {
+ register(descriptor.getName(), descriptor, true, true, false);
+ }
+ } catch (Exception e) {
+ SolrException.log(log, "Error registering SolrCore", e);
+ }
+ }
+ }
+
+ // notify any other objects that need to know when the session was re-connected
+ HashSet<OnReconnect> clonedListeners;
+ synchronized (reconnectListeners) {
+ clonedListeners = new HashSet<>(reconnectListeners);
+ }
+ // the OnReconnect operation can be expensive per listener, so do that async in
+ // the background
+ for (OnReconnect listener : clonedListeners) {
+ try {
+ if (executorService != null) {
+ executorService.submit(new OnReconnectNotifyAsync(listener));
+ } else {
+ listener.command();
+ }
+ } catch (Exception exc) {
+ // not much we can do here other than warn in the log
+ log.warn(
+ "Error when notifying OnReconnect listener {} after session re-connected.",
+ listener,
+ exc);
+ }
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+ } catch (SessionExpiredException e) {
+ throw e;
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
+
/**
* Verifies if /clusterstate.json exists in Zookeepeer, and if it does and is not empty, refuses
* to start and outputs a helpful message regarding collection migration.