You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/21 21:33:07 UTC

[lucene-solr] 05/05: @1275 Fix wait for down and some lock and recovery issues.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 561cdac5214c9927c8c1f2adeae2da3d0c5e0c26
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jan 21 15:17:01 2021 -0600

    @1275 Fix wait for down and some lock and recovery issues.
---
 .../org/apache/solr/cloud/RecoveryStrategy.java    | 68 +++++++++++++---------
 .../solr/cloud/ShardLeaderElectionContext.java     |  2 +
 .../solr/cloud/ShardLeaderElectionContextBase.java |  9 ++-
 .../java/org/apache/solr/cloud/StatePublisher.java |  2 +-
 .../java/org/apache/solr/cloud/ZkController.java   | 10 +++-
 .../java/org/apache/solr/core/CoreContainer.java   | 49 ++++++++--------
 .../src/java/org/apache/solr/core/SolrCore.java    |  2 +-
 .../java/org/apache/solr/handler/IndexFetcher.java |  1 +
 .../apache/solr/handler/admin/PrepRecoveryOp.java  |  2 +-
 .../handler/component/RealTimeGetComponent.java    |  3 -
 .../apache/solr/update/DefaultSolrCoreState.java   | 18 +++---
 .../java/org/apache/solr/update/SolrCoreState.java |  2 +-
 .../org/apache/solr/update/TimedVersionBucket.java |  2 +-
 .../processor/DistributedZkUpdateProcessor.java    |  2 +-
 14 files changed, 96 insertions(+), 76 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 32aa54b..1272ad8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -337,7 +337,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
   }
 
   final public void doRecovery(SolrCore core) throws Exception {
+    int tries = 0;
     while (!isClosed()) {
+      tries++;
       try {
         try {
           if (prevSendPreRecoveryHttpUriRequest != null) {
@@ -370,11 +372,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
         }
 
         if (successfulRecovery) {
+          close = true;
           break;
+        } else {
+          log.info("Trying another loop to recover after failing try={}", tries);
         }
 
       } catch (Exception e) {
-        log.info("Exception trying to recover, try again", e);
+        log.info("Exception trying to recover, try again try={}", tries, e);
       }
     }
   }
@@ -593,10 +598,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
         CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
         leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
 
-        if (isClosed()) {
-          throw new AlreadyClosedException();
-        }
-
         log.info("Begin buffering updates. core=[{}]", coreName);
         // recalling buffer updates will drop the old buffer tlog
         ulog.bufferUpdates();
@@ -632,26 +633,34 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
           // System.out.println("Attempting to PeerSync from " + leaderUrl
           // + " i am:" + zkController.getNodeName());
-          boolean syncSuccess;
-          try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
-            syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
-          }
-          if (syncSuccess) {
-            SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
-            log.info("PeerSync was successful, commit to force open a new searcher");
-            // force open a new searcher
-            core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
-            req.close();
-            log.info("PeerSync stage of recovery was successful.");
-
-            // solrcloud_debug
-            // cloudDebugLog(core, "synced");
-
-            log.info("Replaying updates buffered during PeerSync.");
-            replay(core);
+          try {
+            boolean syncSuccess;
+            try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
+              syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
+            }
+            if (syncSuccess) {
+              SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+              log.info("PeerSync was successful, commit to force open a new searcher");
+              // force open a new searcher
+              core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+              req.close();
+              log.info("PeerSync stage of recovery was successful.");
+
+              // solrcloud_debug
+              // cloudDebugLog(core, "synced");
+
+              log.info("Replaying updates buffered during PeerSync.");
+              replay(core);
+
+              // sync success
+              successfulRecovery = true;
+            } else {
+              successfulRecovery = false;
+            }
 
-            // sync success
-            successfulRecovery = true;
+          } catch (Exception e) {
+            log.error("PeerSync exception", e);
+            successfulRecovery = false;
           }
 
           if (!successfulRecovery) {
@@ -681,6 +690,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
             log.info("Interrupted or already closed, bailing on recovery");
             close = true;
             successfulRecovery = false;
+            break;
           } catch (Exception e) {
             successfulRecovery = false;
             log.error("Error while trying to recover", e);
@@ -710,14 +720,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
             zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
             publishedActive = true;
+            close = true;
 
           } catch (AlreadyClosedException e) {
             log.error("Already closed");
             successfulRecovery = false;
+            close = true;
           } catch (Exception e) {
             log.error("Could not publish as ACTIVE after successful recovery", e);
             successfulRecovery = false;
-           // core.getSolrCoreState().doRecovery(core);
+            close = false;
           }
 
 
@@ -731,7 +743,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
       }
 
-      if (!successfulRecovery) {
+      if (!successfulRecovery && !isClosed()) {
         // lets pause for a moment and we need to try again...
         // TODO: we don't want to retry for some problems?
         // Or do a fall off retry...
@@ -754,9 +766,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
         }
       }
 
-      if (!successfulRecovery) {
+      if (!successfulRecovery && !isClosed()) {
         waitForRetry();
-      } else {
+      } else if (successfulRecovery) {
         break;
       }
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 8256084..56eb674 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -319,6 +319,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
     log.info("There may be a better leader candidate than us - will cancel election, rejoin election, and kick off recovery");
 
     leaderElector.retryElection(false);
+
+    core.getSolrCoreState().doRecovery(core);
   }
 
   public String getShardId() {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index b11c180..abaee54 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -181,11 +181,16 @@ class ShardLeaderElectionContextBase extends ElectionContext {
       //assert leaderZkNodeParentVersion != null;
 
     } catch (NoNodeException e) {
+      log.warn("No node exists for election", e);
       throw new AlreadyClosedException("No node exists for election");
     } catch (KeeperException.NodeExistsException e) {
-      throw new AlreadyClosedException("Node already exists for election");
+      log.warn("Node already exists for election", e);
+
+      zkClient.delete(leaderPath, -1);
+
+      runLeaderProcess(context, weAreReplacement, pauseBeforeStartMs);
     } catch (Throwable t) {
-      ParWork.propagateInterrupt(t);
+      log.warn("Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed: ", t);
       throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed: " + errors, t);
     }
     return true;
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 341ffa3..45d5fda 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -148,7 +148,7 @@ public class StatePublisher implements Closeable {
         Replica replica = zkStateReader.getClusterState().getCollection(collection).getReplica(core);
         String lastState = stateCache.get(core);
         // nocommit
-        if (collection != null && replica != null && state.equals(lastState) && replica.getState().toString().equals(state)) {
+        if (collection != null && replica != null && !state.equals(Replica.State.ACTIVE) && state.equals(lastState) && replica.getState().toString().equals(state)) {
           log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
           // nocommit
           return;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 8ebdfe9..c4e9db6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -417,6 +417,14 @@ public class ZkController implements Closeable, Runnable {
 
     started = true;
 
+    try {
+      if (zkClient.exists( ZkStateReader.LIVE_NODES_ZKNODE + "/" + getNodeName())) {
+        removeEphemeralLiveNode();
+      }
+    } catch (Exception e) {
+      ParWork.propagateInterrupt("Error Removing ephemeral live node. Continuing to close CoreContainer", e);
+    }
+
     this.overseer = new Overseer(cc.getUpdateShardHandler(), CommonParams.CORES_HANDLER_PATH, this, cloudConfig);
     try {
       this.overseerRunningMap = Overseer.getRunningMap(zkClient);
@@ -1407,7 +1415,7 @@ public class ZkController implements Closeable, Runnable {
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
       }
 
-      if (log.isDebugEnabled()) log.debug("Wait to see leader for {}, {}", collection, shardId);
+      log.info("Wait to see leader for {}, {}", collection, shardId);
       Replica leader = null;
       for (int i = 0; i < 30; i++) {
 //        if (leaderElector.isLeader()) {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 61566b6..f7be9a1 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -28,6 +28,7 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
@@ -885,6 +886,27 @@ public class CoreContainer implements Closeable {
       status |= CORE_DISCOVERY_COMPLETE;
 
       for (final CoreDescriptor cd : cds) {
+        if (isZooKeeperAware()) {
+          String collection = cd.getCollectionName();
+          try {
+            zkSys.zkController.zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
+              if (c != null) {
+                Replica replica = c.getReplica(cd.getName());
+
+                if (replica.getState().equals(State.DOWN)) {
+                  return true;
+                }
+
+              }
+              return false;
+            });
+          } catch (InterruptedException e) {
+            ParWork.propagateInterrupt(e);
+          } catch (TimeoutException e) {
+            log.error("Timeout", e);
+          }
+        }
+
         if (log.isDebugEnabled()) log.debug("Process core descriptor {} {} {}", cd.getName(), cd.isTransient(), cd.isLoadOnStartup());
         if (cd.isTransient() || !cd.isLoadOnStartup()) {
           solrCores.addCoreDescriptor(cd);
@@ -914,32 +936,9 @@ public class CoreContainer implements Closeable {
           }));
         }
       }
-      if (zkSys != null && zkSys.getZkController() != null) {
+      if (isZooKeeperAware()) {
+
         ParWork.getRootSharedExecutor().submit(() -> {
-          Collection<SolrCore> cores = getCores(); // TODO use the cores we just launched, this may not be populated yet
-          for (SolrCore core : cores) {
-            CoreDescriptor desc = core.getCoreDescriptor();
-            String collection = desc.getCollectionName();
-            try {
-              zkSys.zkController.zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
-                if (c != null) {
-                  List<Replica> replicas = c.getReplicas();
-                  for (Replica replica : replicas) {
-                    if (replica.getNodeName().equals(zkSys.zkController.getNodeName())) {
-                      if (!replica.getState().equals(Replica.State.DOWN)) {
-                        return false;
-                      }
-                    }
-                  }
-                }
-                return true;
-              });
-            } catch (InterruptedException e) {
-              ParWork.propagateInterrupt(e);
-            } catch (TimeoutException e) {
-              log.error("Timeout", e);
-            }
-          }
           zkSys.getZkController().createEphemeralLiveNode();
         });
       }
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 8d06233..29bc5bc 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -748,7 +748,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
           // another recovery waiting behind us, let it run now instead of after we finish
           log.info("Skipping reload because there is another in line behind");
           lock.unlock();
-          Thread.sleep(50);
+          Thread.sleep(10);
           lock.lock();
           lock.unlock();
           return null;
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index ab4821b..972d012 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -1108,6 +1108,7 @@ public class IndexFetcher {
                   throw e;
                 } catch (Exception e) {
                   log.error("Problem downloading file {}", file, e);
+                  throw e;
                 } finally {
                   fileFetchRequests.remove(file.get(NAME));
                 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 45ef666..f83075d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -80,7 +80,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
         final Replica replica = c.getReplica(cname);
 
         if (replica != null) {
-          if (replica.getState() == waitForState || replica.getState() == Replica.State.ACTIVE && coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName())) {
+          if (replica.getState() == waitForState && coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName())) {
             // if (log.isDebugEnabled()) log.debug("replica={} state={} waitForState={}", replica, replica.getState(), waitForState);
             log.info("replica={} state={} waitForState={}", replica, replica.getState(), waitForState);
             return true;
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index a7d71d1..63eff4b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -1144,9 +1144,6 @@ public class RealTimeGetComponent extends SearchComponent
       // TODO: more complex response?
       rb.rsp.add("sync", success);
 
-      if (!success && rb.req.getCore().getCoreContainer().isZooKeeperAware() && !rb.req.getCore().getSolrCoreState().isRecoverying()) {
-        rb.req.getCore().getSolrCoreState().doRecovery(rb.req.getCore().getCoreContainer(), rb.req.getCore().getCoreDescriptor());
-      }
     } catch (IOException e) {
       log.error("Error while closing", e);
     }
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 475abe4..7c1f4f5 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -52,7 +52,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
 
   private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
 
-  private final ReentrantLock recoveryLock = new ReentrantLock(true);
+  private final ReentrantLock recoveryLock = new ReentrantLock(false);
 
   private final ActionThrottle recoveryThrottle = new ActionThrottle("recovery", Integer.getInteger("solr.recoveryThrottle", 0));
 
@@ -190,7 +190,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
     boolean acquired = false;
     do {
       try {
-        acquired = lock.tryLock(100, TimeUnit.MILLISECONDS);
+        acquired = lock.tryLock() || lock.tryLock(100, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         log.warn("WARNING - Dangerous interrupt", e);
       }
@@ -346,25 +346,19 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
           return;
         }
 
-
         // if we can't get the lock, another recovery is running
         // we check to see if there is already one waiting to go
         // after the current one, and if there is, bail
         boolean locked = recoveryLock.tryLock();
 
-        if (!locked && recoveryWaiting.get() > 1) {
-          log.info("Skipping recovery because there is another already queued");
-          return;
-        }
-
 //        if (closed || prepForClose) {
 //          return;
 //        }
         if (!locked) {
           recoveryWaiting.incrementAndGet();
           if (log.isDebugEnabled()) log.debug("Wait for recovery lock");
-          cancelRecovery();
-          while (!recoveryLock.tryLock(250, TimeUnit.MILLISECONDS)) {
+          cancelRecovery(true, false);
+          while (!(recoveryLock.tryLock() || recoveryLock.tryLock(500, TimeUnit.MILLISECONDS))) {
             if (closed || prepForClose) {
               log.warn("Skipping recovery because we are closed");
               recoveryWaiting.decrementAndGet();
@@ -379,12 +373,13 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
           }
         }
 
+        recoverying = true;
+
         // to be air tight we must also check after lock
         if (prepForClose || closed || corecontainer.isShutDown()) {
           log.info("Skipping recovery due to being closed");
           return;
         }
-        recoverying = true;
 
         recoveryThrottle.minimumWaitBetweenActions();
         recoveryThrottle.markAttemptingAction();
@@ -435,6 +430,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
 
   @Override
   public void cancelRecovery(boolean wait, boolean prepForClose) {
+    log.info("Cancel recovery");
     recoverying = false;
     
     if (prepForClose) {
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 556ac75..c53fe21 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -44,7 +44,7 @@ public abstract class SolrCoreState {
   
   protected volatile boolean closed = false;
   private final Object updateLock = new Object();
-  private final ReentrantLock reloadLock = new ReentrantLock(true);
+  private final ReentrantLock reloadLock = new ReentrantLock(false);
   
   public Object getUpdateLock() {
     return updateLock;
diff --git a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
index 94e7551..abeca02 100644
--- a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
@@ -44,7 +44,7 @@ public class TimedVersionBucket extends VersionBucket {
     boolean success = false;
 
     try {
-      success = lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
+      success = lock.tryLock() || lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
 
       if (success) {
         return function.apply();
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 7ec4dd6..7e8faeb 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -1036,7 +1036,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
               ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock();
               if (!ruleExpiryLock.isLocked()) {
                 try {
-                  if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
+                  if (ruleExpiryLock.tryLock() || ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
                     log.info("Going to expire routing rule");
                     try {
                       // nocommit TODO: needs to use the statepublisher