You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/15 23:17:51 UTC

[lucene-solr] 02/02: @189 - Keep improving delete node/replica. Don't try and delete our local empty index on replication when it's unnecessary.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit e9752c2c894720ac9c0682e44676c0a523819a24
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jul 15 18:17:27 2020 -0500

    @189 - Keep improving delete node/replica. Don't try and delete our local empty index on replication when it's unnecessary.
---
 .../java/org/apache/solr/cloud/RecoveryStrategy.java    | 17 +++++++++++++++--
 .../apache/solr/cloud/ShardLeaderElectionContext.java   |  2 ++
 .../src/java/org/apache/solr/cloud/ZkController.java    | 13 +++++++++++--
 .../solr/cloud/api/collections/DeleteNodeCmd.java       |  7 -------
 .../solr/cloud/api/collections/DeleteReplicaCmd.java    | 14 ++++++--------
 .../collections/OverseerCollectionMessageHandler.java   |  2 +-
 .../solr/cloud/autoscaling/TriggerEventQueue.java       |  5 +++--
 .../org/apache/solr/core/CachingDirectoryFactory.java   |  3 +++
 .../src/java/org/apache/solr/core/CoreContainer.java    | 12 ++++++------
 .../src/java/org/apache/solr/handler/IndexFetcher.java  | 13 +++++++------
 .../solr/client/solrj/impl/BaseCloudSolrClient.java     |  7 +------
 .../src/java/org/apache/solr/SolrTestCase.java          |  4 +++-
 12 files changed, 58 insertions(+), 41 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 01528c3..b5372c0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -58,6 +58,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.IndexFetcher;
 import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
@@ -272,7 +273,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
     }
 
     if (isClosed()) return; // we check closed on return
-    boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();
+    boolean success = false;
+    IndexFetcher.IndexFetchResult result = replicationHandler.doFetch(solrParams, false);
+
+    if (result.getMessage().equals(IndexFetcher.IndexFetchResult.FAILED_BY_INTERRUPT_MESSAGE)) {
+      log.info("Interrupted, stopping recovery");
+      return;
+    }
+
+    if (result.getSuccessful()) {
+      success= true;
+    }
 
     if (!success) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
@@ -326,7 +337,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   @Override
   final public void run() {
-
+    if (cc.isShutDown()) {
+      return;
+    }
     // set request info for logging
     try (SolrCore core = cc.getCore(coreName)) {
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 77e56c8..6940b10 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -311,6 +311,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
           }
           log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps) + " " + shardId);
 
+        } catch (AlreadyClosedException | InterruptedException e) {
+          log.info("Already closed or interrupted, bailing..");
         } catch (Exception e) {
           SolrException.log(log, "There was a problem trying to register as the leader", e);
           ParWork.propegateInterrupt(e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 96c0d0a..abcf584 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -395,6 +395,15 @@ public class ZkController implements Closeable {
     assert ObjectReleaseTracker.track(this);
   }
 
+  public void closeLeaderContext(CoreDescriptor cd) {
+    String collection = cd.getCloudDescriptor().getCollectionName();
+    final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
+    ContextKey contextKey = new ContextKey(collection, coreNodeName);
+    ElectionContext context = electionContexts.get(contextKey);
+    context.close();
+  }
+
   public void start() {
 
     String zkCredentialsProviderClass = cloudConfig.getZkCredentialsProviderClass();
@@ -811,7 +820,7 @@ public class ZkController implements Closeable {
   }
 
   boolean isClosed() {
-    return isClosed;
+    return isClosed || getCoreContainer().isShutDown();
   }
 
   /**
@@ -2742,7 +2751,7 @@ public class ZkController implements Closeable {
     @Override
     // synchronized due to SOLR-11535
     public synchronized boolean onStateChanged(DocCollection collectionState) {
-      if (isClosed) { // don't accidentally delete cores on shutdown due to unreliable state
+      if (isClosed()) { // don't accidentally delete cores on shutdown due to unreliable state
         return true;
       }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
index 400496a..ad16dee 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
@@ -100,7 +100,6 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
                               OverseerCollectionMessageHandler ocmh,
                               String node,
                               String async) throws InterruptedException {
-    CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
     try (ParWork worker = new ParWork("cleanupReplicas")) {
       for (ZkNodeProps sReplica : sourceReplicas) {
         worker.collect(() -> {
@@ -114,7 +113,6 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
           try {
             if (async != null) sourceReplica = sourceReplica.plus(ASYNC, async);
             ((DeleteReplicaCmd) ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
-              cleanupLatch.countDown();
               if (deleteResult.get("failure") != null) {
                 synchronized (results) {
 
@@ -125,21 +123,16 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
             });
           } catch (KeeperException e) {
             log.warn("Error deleting ", e);
-            cleanupLatch.countDown();
           } catch (InterruptedException e) {
             ParWork.propegateInterrupt(e);
-            cleanupLatch.countDown();
           }catch (Exception e) {
             log.warn("Error deleting ", e);
-            cleanupLatch.countDown();
             throw e;
           }
         });
       }
       worker.addCollect("deleteNodeReplicas");
     }
-    if (log.isDebugEnabled()) log.debug("Waiting for delete node action to complete");
-    cleanupLatch.await(5, TimeUnit.MINUTES);
   }
 
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 64345ad..ece03c9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -73,9 +73,9 @@ public class DeleteReplicaCmd implements Cmd {
   @SuppressWarnings("unchecked")
   void deleteReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
           throws KeeperException, InterruptedException {
-    if (log.isDebugEnabled()) {
-      log.debug("deleteReplica() : {}", Utils.toJSONString(message));
-    }
+
+    log.info("deleteReplica() : {}", Utils.toJSONString(message));
+
     boolean parallel = message.getBool("parallel", false);
 
     //If a count is specified the strategy needs be different
@@ -84,7 +84,6 @@ public class DeleteReplicaCmd implements Cmd {
       return;
     }
 
-
     ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
     String extCollectionName = message.getStr(COLLECTION_PROP);
     String shard = message.getStr(SHARD_ID_PROP);
@@ -106,7 +105,6 @@ public class DeleteReplicaCmd implements Cmd {
     }
 
     deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete,  parallel);
-
   }
 
 
@@ -221,7 +219,7 @@ public class DeleteReplicaCmd implements Cmd {
 
   @SuppressWarnings({"unchecked"})
   void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException {
-
+    log.info("delete core {}", replicaName);
     Replica replica = slice.getReplica(replicaName);
     if (replica == null) {
       ArrayList<String> l = new ArrayList<>();
@@ -262,10 +260,10 @@ public class DeleteReplicaCmd implements Cmd {
       try {
         if (isLive) {
           shardRequestTracker.processResponses(results, shardHandler, false, null);
+          // try and ensure core info is removed from cluster state
+
         }
-        // try and ensure core info is removed from cluster state
         ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
-        ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000);
       } catch (Exception e) {
         SolrZkClient.checkInterrupted(e);
         results.add("failure", "Could not complete delete " + e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 097a996..fb7408a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -432,7 +432,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting for nodes to go away");
     }
 
-    return true;
+    return false;
   }
 
   void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
index ec41495..9dc2794 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.cloud.DistributedQueue;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.ParWork;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.common.util.TimeSource;
@@ -81,8 +82,8 @@ public class TriggerEventQueue {
         }
       }
     } 
-    catch (AlreadyClosedException e) {
-      
+    catch (AlreadyClosedException | InterruptedException e) {
+      ParWork.propegateInterrupt(e);
     }
     catch (Exception e) {
       log.warn("Exception peeking queue of trigger {}", triggerName, e);
diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
index d877c17..fa24a0a 100644
--- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
@@ -215,6 +215,9 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
             }
           }
           assert val.refCnt == 0 : val.refCnt;
+        } catch (InterruptedException e) {
+          ParWork.propegateInterrupt("Interrupted closing directory", e);
+          return;
         } catch (Exception e) {
           ParWork.propegateInterrupt("Error closing directory", e);
           throw new SolrException(ErrorCode.SERVER_ERROR, "Error closing directory");
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 0b63de5..7b2a543 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -37,17 +37,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -133,7 +128,6 @@ import org.apache.solr.update.SolrCoreState;
 import org.apache.solr.update.UpdateShardHandler;
 import org.apache.solr.common.util.OrderedExecutor;
 import org.apache.solr.util.RefCounted;
-import org.apache.solr.util.stats.MetricUtils;
 import org.apache.zookeeper.KeeperException;
 import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.slf4j.Logger;
@@ -1762,6 +1756,12 @@ public class CoreContainer implements Closeable {
     if (cd == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot unload non-existent core [" + name + "]");
     }
+
+    if (isZooKeeperAware()) {
+      getZkController().closeLeaderContext(cd);
+      getZkController().stopReplicationFromLeader(cd.getName());
+    }
+
     SolrCore core = null;
     boolean close;
     try {
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index f791ccd..a6e6a31 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -170,7 +170,7 @@ public class IndexFetcher {
 
   private boolean downloadTlogFiles = false;
 
-  private boolean skipCommitOnMasterVersionZero = true;
+  private boolean skipCommitOnMasterVersionZero = false;
 
   private boolean clearLocalIndexFirst = false;
 
@@ -456,13 +456,14 @@ public class IndexFetcher {
         }
       }
 
+      long slaveVersion = IndexDeletionPolicyWrapper.getCommitTimestamp(commit);
       if (log.isInfoEnabled()) {
         log.info("Slave's generation: {}", commit.getGeneration());
-        log.info("Slave's version: {}", IndexDeletionPolicyWrapper.getCommitTimestamp(commit)); // logOK
+        log.info("Slave's version: {}", slaveVersion); // logOK
       }
 
       if (latestVersion == 0L) {
-        if (commit.getGeneration() != 0) {
+        if (commit.getGeneration() > 1 || slaveVersion > 0) {
           // since we won't get the files for an empty index,
           // we just clear ours and commit
           log.info("New index in Master. Deleting mine...");
@@ -563,8 +564,8 @@ public class IndexFetcher {
             indexWriter.deleteUnusedFiles();
             while (hasUnusedFiles(indexDir, commit)) {
               indexWriter.deleteUnusedFiles();
-              log.info("Sleeping for 1000ms to wait for unused lucene index files to be delete-able");
-              Thread.sleep(1000);
+              log.info("Sleeping for 250ms to wait for unused lucene index files to be delete-able");
+              Thread.sleep(250);
               c++;
               if (c >= 30)  {
                 log.warn("IndexFetcher unable to cleanup unused lucene index files so we must do a full copy instead");
@@ -573,7 +574,7 @@ public class IndexFetcher {
               }
             }
             if (c > 0)  {
-              log.info("IndexFetcher slept for {}ms for unused lucene index files to be delete-able", c * 1000);
+              log.info("IndexFetcher slept for {}ms for unused lucene index files to be delete-able", c * 250);
             }
           } finally {
             writer.decref();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 13643c1..0a1750d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -1097,14 +1097,13 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     } else if (action != null && request.getParams().get(CoreAdminParams.ACTION).equals(CollectionParams.CollectionAction.DELETENODE.toString())) {
       // TODO: make efficient, timeout
       String node = request.getParams().get("node");
-      try {
+
         boolean wait = true;
         while (wait) {
           ClusterState clusterState = getZkStateReader().getClusterState();
           for (DocCollection docCollection : clusterState.getCollectionsMap().values()) {
             for (Replica replica : docCollection.getReplicas()) {
               if (replica.getNodeName().equals(node)) {
-                Thread.sleep(100);
                 continue;
               }
             }
@@ -1112,10 +1111,6 @@ public abstract class BaseCloudSolrClient extends SolrClient {
           break;
         }
 
-      } catch (InterruptedException e) {
-        ParWork.propegateInterrupt(e);
-        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
-      }
     }
   }
 
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index b57cee6..923d7eb 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -377,6 +377,7 @@ public class SolrTestCase extends LuceneTestCase {
       System.out.println("Show Close Times");
       Class<? extends Object> clazz = null;
       Long tooLongTime = 0L;
+      String times = null;
       try {
         synchronized (TimeTracker.CLOSE_TIMES) {
           Map<String, TimeTracker> closeTimes = TimeTracker.CLOSE_TIMES;
@@ -385,6 +386,7 @@ public class SolrTestCase extends LuceneTestCase {
             if (closeTime.getElapsedMS() > closeTimeout) {
               tooLongTime = closeTime.getElapsedMS();
               clazz = closeTime.getClazz();
+              times = closeTime.getCloseTimes();
             }
             // turn off until layout is fixed again
             // closeTime.printCloseTimes();
@@ -397,7 +399,7 @@ public class SolrTestCase extends LuceneTestCase {
 
       if (clazz != null) {
         // nocommit - leave this on
-        fail("A " + clazz.getName() + " took too long to close: " + tooLongTime);
+        fail("A " + clazz.getName() + " took too long to close: " + tooLongTime + "\n" + times);
       }
 
     }