You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2022/09/16 03:12:38 UTC

[solr] branch main updated: refactor out onReconnect into a method

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new d3a6de0284a refactor out onReconnect into a method
d3a6de0284a is described below

commit d3a6de0284a295994385bf1d061a9bd221044b20
Author: Noble Paul <no...@gmail.com>
AuthorDate: Fri Sep 16 13:12:27 2022 +1000

    refactor out onReconnect into a method
---
 .../java/org/apache/solr/cloud/ZkController.java   | 245 ++++++++++-----------
 1 file changed, 118 insertions(+), 127 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 7126ee7cba2..1307abfada8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -73,7 +73,6 @@ import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.StringUtils;
-import org.apache.solr.common.cloud.BeforeReconnect;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DefaultConnectionStrategy;
 import org.apache.solr.common.cloud.DefaultZkACLProvider;
@@ -382,133 +381,15 @@ public class ZkController implements Closeable {
             clientTimeout,
             zkClientConnectTimeout,
             strat,
-            // on reconnect, reload cloud info
-            new OnReconnect() {
-
-              @Override
-              public void command() throws SessionExpiredException {
-                log.info(
-                    "ZooKeeper session re-connected ... refreshing core states after session expiration.");
-                clearZkCollectionTerms();
-                try {
-                  // recreate our watchers first so that they exist even on any problems below
-                  zkStateReader.createClusterStateWatchersAndUpdate();
-
-                  // this is troublesome - we dont want to kill anything the old
-                  // leader accepted
-                  // though I guess sync will likely get those updates back? But
-                  // only if
-                  // he is involved in the sync, and he certainly may not be
-                  // ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
-                  // we need to create all of our lost watches
-
-                  // seems we dont need to do this again...
-                  // Overseer.createClientNodes(zkClient, getNodeName());
-
-                  // start the overseer first as following code may need it's processing
-                  if (!zkRunOnly) {
-                    ElectionContext context =
-                        new OverseerElectionContext(zkClient, overseer, getNodeName());
-
-                    ElectionContext prevContext = overseerElector.getContext();
-                    if (prevContext != null) {
-                      prevContext.cancelElection();
-                      prevContext.close();
-                    }
-
-                    overseerElector.setup(context);
-
-                    if (cc.nodeRoles.isOverseerAllowedOrPreferred()) {
-                      overseerElector.joinElection(context, true);
-                    }
-                  }
-
-                  cc.cancelCoreRecoveries();
-
-                  try {
-                    registerAllCoresAsDown(descriptorsSupplier, false);
-                  } catch (SessionExpiredException e) {
-                    // zk has to reconnect and this will all be tried again
-                    throw e;
-                  } catch (Exception e) {
-                    // this is really best effort - in case of races or failure cases where we now
-                    // need to be the leader, if anything fails, just continue
-                    log.warn("Exception while trying to register all cores as DOWN", e);
-                  }
-
-                  // we have to register as live first to pick up docs in the buffer
-                  createEphemeralLiveNode();
-
-                  List<CoreDescriptor> descriptors = descriptorsSupplier.get();
-                  // re register all descriptors
-                  ExecutorService executorService =
-                      (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
-                  if (descriptors != null) {
-                    for (CoreDescriptor descriptor : descriptors) {
-                      // TODO: we need to think carefully about what happens when it was a leader
-                      // that was expired - as well as what to do about leaders/overseers with
-                      // connection loss
-                      try {
-                        // unload solrcores that have been 'failed over'
-                        throwErrorIfReplicaReplaced(descriptor);
-
-                        if (executorService != null) {
-                          executorService.submit(new RegisterCoreAsync(descriptor, true, true));
-                        } else {
-                          register(descriptor.getName(), descriptor, true, true, false);
-                        }
-                      } catch (Exception e) {
-                        SolrException.log(log, "Error registering SolrCore", e);
-                      }
-                    }
-                  }
-
-                  // notify any other objects that need to know when the session was re-connected
-                  HashSet<OnReconnect> clonedListeners;
-                  synchronized (reconnectListeners) {
-                    clonedListeners = (HashSet<OnReconnect>) reconnectListeners.clone();
-                  }
-                  // the OnReconnect operation can be expensive per listener, so do that async in
-                  // the background
-                  for (OnReconnect listener : clonedListeners) {
-                    try {
-                      if (executorService != null) {
-                        executorService.submit(new OnReconnectNotifyAsync(listener));
-                      } else {
-                        listener.command();
-                      }
-                    } catch (Exception exc) {
-                      // not much we can do here other than warn in the log
-                      log.warn(
-                          "Error when notifying OnReconnect listener {} after session re-connected.",
-                          listener,
-                          exc);
-                    }
-                  }
-                } catch (InterruptedException e) {
-                  // Restore the interrupted status
-                  Thread.currentThread().interrupt();
-                  throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-                } catch (SessionExpiredException e) {
-                  throw e;
-                } catch (Exception e) {
-                  SolrException.log(log, "", e);
-                  throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-                }
-              }
-            },
-            new BeforeReconnect() {
-
-              @Override
-              public void command() {
-                try {
-                  ZkController.this.overseer.close();
-                } catch (Exception e) {
-                  log.error("Error trying to stop any Overseer threads", e);
-                }
-                closeOutstandingElections(descriptorsSupplier);
-                markAllAsNotLeader(descriptorsSupplier);
+            () -> onReconnect(descriptorsSupplier),
+            () -> {
+              try {
+                this.overseer.close();
+              } catch (Exception e) {
+                log.error("Error trying to stop any Overseer threads", e);
               }
+              closeOutstandingElections(descriptorsSupplier);
+              markAllAsNotLeader(descriptorsSupplier);
             },
             zkACLProvider,
             cc::isShutDown);
@@ -544,6 +425,116 @@ public class ZkController implements Closeable {
     assert ObjectReleaseTracker.track(this);
   }
 
+  private void onReconnect(Supplier<List<CoreDescriptor>> descriptorsSupplier)
+      throws SessionExpiredException {
+    // on reconnect, reload cloud info
+    log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
+    clearZkCollectionTerms();
+    try {
+      // recreate our watchers first so that they exist even on any problems below
+      zkStateReader.createClusterStateWatchersAndUpdate();
+
+      // this is troublesome - we don't want to kill anything the old
+      // leader accepted
+      // though I guess sync will likely get those updates back? But
+      // only if
+      // he is involved in the sync, and he certainly may not be
+      // ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
+      // we need to create all of our lost watches
+
+      // seems we don't need to do this again...
+      // Overseer.createClientNodes(zkClient, getNodeName());
+
+      // start the overseer first as following code may need it's processing
+      if (!zkRunOnly) {
+        ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
+
+        ElectionContext prevContext = overseerElector.getContext();
+        if (prevContext != null) {
+          prevContext.cancelElection();
+          prevContext.close();
+        }
+
+        overseerElector.setup(context);
+
+        if (cc.nodeRoles.isOverseerAllowedOrPreferred()) {
+          overseerElector.joinElection(context, true);
+        }
+      }
+
+      cc.cancelCoreRecoveries();
+
+      try {
+        registerAllCoresAsDown(descriptorsSupplier, false);
+      } catch (SessionExpiredException e) {
+        // zk has to reconnect and this will all be tried again
+        throw e;
+      } catch (Exception e) {
+        // this is really best effort - in case of races or failure cases where we now
+        // need to be the leader, if anything fails, just continue
+        log.warn("Exception while trying to register all cores as DOWN", e);
+      }
+
+      // we have to register as live first to pick up docs in the buffer
+      createEphemeralLiveNode();
+
+      List<CoreDescriptor> descriptors = descriptorsSupplier.get();
+      // re register all descriptors
+      ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
+      if (descriptors != null) {
+        for (CoreDescriptor descriptor : descriptors) {
+          // TODO: we need to think carefully about what happens when it was a leader
+          // that was expired - as well as what to do about leaders/overseers with
+          // connection loss
+          try {
+            // unload solr cores that have been 'failed over'
+            throwErrorIfReplicaReplaced(descriptor);
+
+            if (executorService != null) {
+              executorService.submit(new RegisterCoreAsync(descriptor, true, true));
+            } else {
+              register(descriptor.getName(), descriptor, true, true, false);
+            }
+          } catch (Exception e) {
+            SolrException.log(log, "Error registering SolrCore", e);
+          }
+        }
+      }
+
+      // notify any other objects that need to know when the session was re-connected
+      HashSet<OnReconnect> clonedListeners;
+      synchronized (reconnectListeners) {
+        clonedListeners = new HashSet<>(reconnectListeners);
+      }
+      // the OnReconnect operation can be expensive per listener, so do that async in
+      // the background
+      for (OnReconnect listener : clonedListeners) {
+        try {
+          if (executorService != null) {
+            executorService.submit(new OnReconnectNotifyAsync(listener));
+          } else {
+            listener.command();
+          }
+        } catch (Exception exc) {
+          // not much we can do here other than warn in the log
+          log.warn(
+              "Error when notifying OnReconnect listener {} after session re-connected.",
+              listener,
+              exc);
+        }
+      }
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+    } catch (SessionExpiredException e) {
+      throw e;
+    } catch (Exception e) {
+      SolrException.log(log, "", e);
+      throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+    }
+  }
+
   /**
    * Verifies if /clusterstate.json exists in Zookeepeer, and if it does and is not empty, refuses
    * to start and outputs a helpful message regarding collection migration.