You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/10 04:38:24 UTC

[lucene-solr] branch reference_impl_dev updated (bdcab72 -> b37b718)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from bdcab72  @1128 Fix up test.
     new f8708f0  @1129 Try this for bad schema persistence issue.
     new b38a92d  @1130 Tuning collection delete
     new ebdb2c3  @1131 Working out collections queue.
     new 76c4555  @1132 Don't lose a collection
     new b37b718  @1133 Bit of clusterstate managment work.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/solr/cloud/LeaderElector.java  |  21 ++--
 .../src/java/org/apache/solr/cloud/Overseer.java   | 109 +++++++++++----------
 .../org/apache/solr/cloud/RecoveryStrategy.java    |   9 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java |   2 +
 .../java/org/apache/solr/cloud/ZkController.java   |   4 +
 .../cloud/api/collections/CreateCollectionCmd.java |  41 ++++++--
 .../cloud/api/collections/DeleteCollectionCmd.java |  26 +++--
 .../apache/solr/cloud/overseer/SliceMutator.java   |   4 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  44 +++++----
 .../java/org/apache/solr/core/CoreContainer.java   |   4 +
 .../apache/solr/handler/admin/PrepRecoveryOp.java  |  24 +++--
 .../org/apache/solr/schema/ManagedIndexSchema.java |  28 +++---
 .../solr/cloud/CollectionStateZnodeTest.java       |   1 +
 .../client/solrj/impl/BaseCloudSolrClient.java     |   2 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |   8 +-
 15 files changed, 190 insertions(+), 137 deletions(-)


[lucene-solr] 02/05: @1130 Tuning collection delete

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit b38a92ddf07ec50a417f36ee7b9bb7d06d25afe8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 20:42:04 2020 -0600

    @1130 Tuning collection delete
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 110 +++++++++++----------
 .../cloud/api/collections/DeleteCollectionCmd.java |  28 +++---
 .../org/apache/solr/common/cloud/SolrZkClient.java |   8 +-
 3 files changed, 77 insertions(+), 69 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3535dfb..4f73290 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -956,11 +956,11 @@ public class Overseer implements SolrCloseable {
 
           Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
 
-          ParWork.getRootSharedExecutor().submit(() -> {
+          overseer.getTaskExecutor().submit(() -> {
             try {
               runAsync(items, fullPaths, data);
             } catch (Exception e) {
-              log.error("failed processing collection queue items " + items);
+              log.error("failed processing collection queue items " + items, e);
             }
           });
         } finally {
@@ -969,73 +969,81 @@ public class Overseer implements SolrCloseable {
 
       }
 
-      private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) throws KeeperException {
+      private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) {
         for (Map.Entry<String,byte[]> entry : data.entrySet()) {
-          byte[] item = entry.getValue();
-          if (item == null) {
-            log.error("empty item {}", entry.getKey());
-            continue;
-          }
-
-          final ZkNodeProps message = ZkNodeProps.load(item);
           try {
-            String operation = message.getStr(Overseer.QUEUE_OPERATION);
-            if (operation == null) {
-              log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+            byte[] item = entry.getValue();
+            if (item == null) {
+              log.error("empty item {}", entry.getKey());
               continue;
             }
 
-            final String asyncId = message.getStr(ASYNC);
+            final ZkNodeProps message = ZkNodeProps.load(item);
+            try {
+              String operation = message.getStr(Overseer.QUEUE_OPERATION);
+              if (operation == null) {
+                log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+                continue;
+              }
 
-            OverseerSolrResponse response;
-            if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
-              response = configMessageHandler.processMessage(message, operation);
-            } else {
-              response = collMessageHandler.processMessage(message, operation);
-            }
+              final String asyncId = message.getStr(ASYNC);
 
-            //          try {
-            //            overseer.writePendingUpdates();
-            //          } catch (InterruptedException e) {
-            //            log.error("Overseer state update queue processing interrupted");
-            //            return;
-            //          }
+              OverseerSolrResponse response;
+              if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
+                response = configMessageHandler.processMessage(message, operation);
+              } else {
+                response = collMessageHandler.processMessage(message, operation);
+              }
 
-            log.info("response {}", response);
+              //          try {
+              //            overseer.writePendingUpdates();
+              //          } catch (InterruptedException e) {
+              //            log.error("Overseer state update queue processing interrupted");
+              //            return;
+              //          }
+
+              log.info("response {}", response);
+
+              if (asyncId != null) {
+                if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
+                  if (log.isDebugEnabled()) {
+                    log.debug("Updated failed map for task with id:[{}]", asyncId);
+                  }
+                  failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
+                } else {
+                  if (log.isDebugEnabled()) {
+                    log.debug("Updated completed map for task with zkid:[{}]", asyncId);
+                  }
+                  completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
 
-            if (asyncId != null) {
-              if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
-                if (log.isDebugEnabled()) {
-                  log.debug("Updated failed map for task with id:[{}]", asyncId);
                 }
-                failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
               } else {
-                if (log.isDebugEnabled()) {
-                  log.debug("Updated completed map for task with zkid:[{}]", asyncId);
-                }
-                completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
-
+                byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
+                String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
+                zkController.getZkClient().setData(responsePath, sdata, true);
+                log.debug("Completed task:[{}] {}", message, response.getResponse());
               }
-            } else {
-              byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
-              String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
-              zkController.getZkClient().setData(responsePath, sdata, true);
-              log.debug("Completed task:[{}] {}", message, response.getResponse());
+
+            } catch (InterruptedException e) {
+              log.error("Overseer state update queue processing interrupted");
+              return;
             }
 
-          } catch (InterruptedException e) {
-            log.error("Overseer state update queue processing interrupted");
-            return;
+          } catch (Exception e) {
+            log.warn("Exception deleting processed zk nodes", e);
           }
-        }
+          try {
+            for (String item : items) {
+              if (item.startsWith("qnr-")) {
+                fullPaths.remove(path + "/" + item);
+              }
+            }
 
-        for (String item : items) {
-          if (item.startsWith("qnr-")) {
-            fullPaths.remove(path + "/" + item);
+            zkController.getZkClient().delete(fullPaths, true);
+          } catch (Exception e) {
+            log.warn("Exception deleting processed zk nodes", e);
           }
         }
-
-        zkController.getZkClient().delete(fullPaths, true);
       }
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index f86dd55..2ac109e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -79,7 +79,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
   @Override
   public AddReplicaCmd.Response call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
-    log.info("delete collection called");
+    log.info("delete collection called {}", message);
     Object o = message.get(MaintainRoutedAliasCmd.INVOKED_BY_ROUTED_ALIAS);
     if (o != null) {
       ((Runnable)o).run(); // this will ensure the collection is removed from the alias before it disappears.
@@ -116,6 +116,14 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       SolrZkClient zkClient = zkStateReader.getZkClient();
       SolrSnapshotManager.cleanupCollectionLevelSnapshots(zkClient, collection);
 
+      log.info("Check if collection exists in zookeeper {}", collection);
+
+      if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
+      }
+
+
+      log.info("Collection exists, remove it, {}", collection);
       // remove collection-level metrics history
       if (deleteHistory) {
         MetricsHistoryHandler historyHandler = ocmh.overseer.getCoreContainer().getMetricsHistoryHandler();
@@ -132,25 +140,15 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       String asyncId = message.getStr(ASYNC);
 
-
       ZkNodeProps internalMsg = message.plus(NAME, collection);
 
-      if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
-      }
-
-
-      clusterState = new ClusterStateMutator(ocmh.cloudManager) .deleteCollection(clusterState, collection);
+      clusterState = new ClusterStateMutator(ocmh.cloudManager).deleteCollection(clusterState, collection);
 
       shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
 
-      shardRequestTracker =
-          new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader,
-              ocmh.shardHandlerFactory, ocmh.overseer);
+      shardRequestTracker = new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader, ocmh.shardHandlerFactory, ocmh.overseer);
 
-      @SuppressWarnings({"unchecked"})
-      List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions,
-          shardHandler, shardRequestTracker);
+      @SuppressWarnings({"unchecked"}) List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions, shardHandler, shardRequestTracker);
 
       if (failedReplicas == null) {
         // TODO: handle this in any special way? more logging?
@@ -190,8 +188,6 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         if (finalShardHandler != null && finalShardRequestTracker != null) {
           try {
             finalShardRequestTracker.processResponses(results, finalShardHandler, false, null, okayExceptions);
-            // TODO: wait for delete collection?
-            zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (l, c) -> c == null);
           } catch (Exception e) {
             log.error("Exception waiting for results of delete collection cmd", e);
           }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 8b0fb93..f352a88 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -315,9 +315,13 @@ public class SolrZkClient implements Closeable {
       throws KeeperException, InterruptedException {
     ZooKeeper keeper = connManager.getKeeper();
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.exists(path, null) != null);
+      Stat existsStat = zkCmdExecutor.retryOperation(() -> keeper.exists(path, null));
+      log.info("exists state return is {} {}", path, existsStat);
+      return existsStat != null;
     } else {
-      return keeper.exists(path, null) != null;
+      Stat existsStat = keeper.exists(path, null);
+      log.info("exists state return is {} {}", path, existsStat);
+      return existsStat != null;
     }
   }
 


[lucene-solr] 04/05: @1132 Don't lose a collection

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 76c455547d793b6f3dc03622c49d85dff030b68b
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 21:36:35 2020 -0600

    @1132 Don't lose a collection
---
 .../cloud/api/collections/CreateCollectionCmd.java | 38 ++++++++++++++++++----
 .../apache/solr/cloud/overseer/SliceMutator.java   |  4 +--
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 11 +++++--
 3 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index f664dd0..bb6a826 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -320,32 +321,55 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
           @SuppressWarnings({"rawtypes"}) boolean failure = results.get("failure") != null && ((SimpleOrderedMap) results.get("failure")).size() > 0;
           if (failure) {
+            log.error("Failure creating collection {}", results.get("failure"));
             //        // Let's cleanup as we hit an exception
             //        // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
             //        // element, which may be interpreted by the user as a positive ack
             //        // nocommit review
             try {
-              response.clusterState =  ocmh.cleanupCollection(collectionName, new NamedList<Object>()).clusterState;
+              response.clusterState = ocmh.cleanupCollection(collectionName, new NamedList<Object>()).clusterState;
             } catch (Exception e) {
               log.error("Exception trying to clean up collection after fail {}", collectionName);
             }
             log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
-                   //throw new SolrException(ErrorCode.BAD_REQUEST, "Underlying core creation failed while creating collection: " + collectionName + "\n" + results);
+            //throw new SolrException(ErrorCode.BAD_REQUEST, "Underlying core creation failed while creating collection: " + collectionName + "\n" + results);
           } else {
+
+            try {
+              zkStateReader.waitForState(collectionName, 10, TimeUnit.SECONDS, (l, c) -> {
+                if (c == null) {
+                  return false;
+                }
+                for (String name : coresToCreate.keySet()) {
+                  if (c.getReplica(name) == null) {
+                    return false;
+                  }
+                }
+                Collection<Slice> slices = c.getSlices();
+                for (Slice slice : slices) {
+                  if (slice.getLeader() == null) {
+                    return false;
+                  }
+                }
+                return true;
+              });
+            } catch (InterruptedException e) {
+              log.warn("Interrupted waiting for active replicas on collection creation {}", collectionName);
+            } catch (TimeoutException e) {
+              log.error("Exception waiting for active replicas on collection creation {}", collectionName);
+            }
+
             if (log.isDebugEnabled()) log.debug("Finished create command on all shards for collection: {}", collectionName);
 
             // Emit a warning about production use of data driven functionality
             boolean defaultConfigSetUsed = message.getStr(COLL_CONF) == null || message.getStr(COLL_CONF).equals(ConfigSetsHandlerApi.DEFAULT_CONFIGSET_NAME);
             if (defaultConfigSetUsed) {
-              results.add("warning",
-                  "Using _default configset. Data driven schema functionality" + " is enabled by default, which is NOT RECOMMENDED for production use. To turn it off:" + " curl http://{host:port}/solr/"
-                      + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
+              results.add("warning", "Using _default configset. Data driven schema functionality" + " is enabled by default, which is NOT RECOMMENDED for production use. To turn it off:"
+                  + " curl http://{host:port}/solr/" + collectionName + "/config -d '{\"set-user-property\": {\"update.autoCreateFields\":\"false\"}}'");
             }
 
           }
 
-
-          zkStateReader.waitForActiveCollection(collectionName, 10, TimeUnit.SECONDS, shardNames.size(), finalReplicaPositions.size());
           return response;
         }
       };
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 9d092db..9f17e28 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -125,8 +125,6 @@ public class SliceMutator {
       log.debug("setShardLeader(ClusterState clusterState={}, ZkNodeProps message={}) - start", clusterState, message);
     }
 
-    StringBuilder sb = new StringBuilder();
-
     String coreName = message.getStr(ZkStateReader.CORE_NAME_PROP);
 
     String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
@@ -134,7 +132,7 @@ public class SliceMutator {
     DocCollection coll = clusterState.getCollectionOrNull(collectionName);
 
     if (coll == null) {
-      log.error("Could not mark shard leader for non existing collection: {}", collectionName);
+      log.error("Could not mark shard leader for non existing collection: {} {}", collectionName, message);
       return clusterState;
     }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 7573453..fb3a2d5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud.overseer;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -104,6 +105,13 @@ public class ZkStateWriter {
           }
         }
       });
+      Collection<DocCollection> collections = cs.getCollectionsMap().values();
+      for (DocCollection collection : collections) {
+        if (clusterState.getCollectionOrNull(collection.getName()) == null) {
+          clusterState = clusterState.copyWith(collection.getName(), collection);
+        }
+      }
+
       this.cs = clusterState;
     } else {
       clusterState.forEachCollection(newCollection -> {
@@ -334,8 +342,7 @@ public class ZkStateWriter {
 
   }
 
-  public ClusterState getClusterstate(boolean stateUpdate) {
-
+  public synchronized ClusterState getClusterstate(boolean stateUpdate) {
     return cs;
   }
 


[lucene-solr] 01/05: @1129 Try this for bad schema persistence issue.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit f8708f0c79451b411c088c5e2f3269eb17de519d
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 20:10:15 2020 -0600

    @1129 Try this for bad schema persistence issue.
---
 .../org/apache/solr/schema/ManagedIndexSchema.java | 28 ++++++++++------------
 1 file changed, 12 insertions(+), 16 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
index e4dd497..7219bfe 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
@@ -182,7 +182,7 @@ public final class ManagedIndexSchema extends IndexSchema {
    * 
    * @return true on success 
    */
-  boolean persistManagedSchemaToZooKeeper(boolean createOnly) {
+  synchronized boolean persistManagedSchemaToZooKeeper(boolean createOnly) {
     final ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader) loader;
     final ZkController zkController = zkLoader.getZkController();
     final SolrZkClient zkClient = zkController.getZkClient();
@@ -212,33 +212,29 @@ public final class ManagedIndexSchema extends IndexSchema {
       } else {
         try {
           // Assumption: the path exists
-          Stat stat = zkClient.setData(managedSchemaPath, data, schemaZkVersion, true);
+          int ver = schemaZkVersion;
+          Stat stat = zkClient.setData(managedSchemaPath, data, ver, true);
           schemaZkVersion = stat.getVersion();
-          log.info("Persisted managed schema version {} at {}", schemaZkVersion, managedSchemaPath);
+          log.info("Persisted managed schema version {} at {}", ver, managedSchemaPath);
         } catch (KeeperException.BadVersionException e) {
-          if (schemaZkVersion == 0) {
-            try {
-              Stat stat = zkClient.setData(managedSchemaPath, data, 1, true);
+            Thread.sleep(50);
+            // try again with latest schemaZkVersion value
+          int ver = 0;
+          try {
+              ver = schemaZkVersion;
+              Stat stat = zkClient.setData(managedSchemaPath, data, ver, true);
               schemaZkVersion = stat.getVersion();
-              log.info("Persisted managed schema version {} at {}", schemaZkVersion, managedSchemaPath);
+              log.info("Persisted managed schema version {} at {}", ver, managedSchemaPath);
             } catch (KeeperException.BadVersionException e1) {
               Stat stat = new Stat();
               zkClient.getData(managedSchemaPath, null, stat, true);
 
-              log.info("Bad version when trying to persist schema using {} found {}", 1, stat.getVersion());
+              log.info("Bad version when trying to persist schema using {} found {}", ver, stat.getVersion());
 
               success = false;
               schemaChangedInZk = true;
             }
-          } else {
-            Stat stat = new Stat();
-            zkClient.getData(managedSchemaPath, null, stat, true);
 
-            log.info("Bad version when trying to persist schema using {} found {}", schemaZkVersion, stat.getVersion());
-
-            success = false;
-            schemaChangedInZk = true;
-          }
         }
       }
     } catch (Exception e) {


[lucene-solr] 03/05: @1131 Working out collections queue.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit ebdb2c38380c96dc40fb6d85cf028b88001ece0b
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 20:58:11 2020 -0600

    @1131 Working out collections queue.
---
 solr/core/src/java/org/apache/solr/cloud/Overseer.java | 17 ++++++-----------
 1 file changed, 6 insertions(+), 11 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4f73290..683f3da 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -956,6 +956,12 @@ public class Overseer implements SolrCloseable {
 
           Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
 
+          try {
+            zkController.getZkClient().delete(fullPaths, true);
+          } catch (Exception e) {
+            log.warn("Exception deleting processed zk nodes", e);
+          }
+
           overseer.getTaskExecutor().submit(() -> {
             try {
               runAsync(items, fullPaths, data);
@@ -1032,17 +1038,6 @@ public class Overseer implements SolrCloseable {
           } catch (Exception e) {
             log.warn("Exception deleting processed zk nodes", e);
           }
-          try {
-            for (String item : items) {
-              if (item.startsWith("qnr-")) {
-                fullPaths.remove(path + "/" + item);
-              }
-            }
-
-            zkController.getZkClient().delete(fullPaths, true);
-          } catch (Exception e) {
-            log.warn("Exception deleting processed zk nodes", e);
-          }
         }
       }
     }


[lucene-solr] 05/05: @1133 Bit of clusterstate managment work.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit b37b7184c8db0ec71212585f6b4993942fa6dcb8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 22:30:54 2020 -0600

    @1133 Bit of clusterstate managment work.
---
 .../java/org/apache/solr/cloud/LeaderElector.java  | 21 +++++++-------
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  9 +++---
 .../solr/cloud/ShardLeaderElectionContextBase.java |  2 ++
 .../java/org/apache/solr/cloud/ZkController.java   |  4 +++
 .../cloud/api/collections/CreateCollectionCmd.java |  5 +++-
 .../cloud/api/collections/DeleteCollectionCmd.java |  2 ++
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 33 ++++++++++++----------
 .../java/org/apache/solr/core/CoreContainer.java   |  4 +++
 .../apache/solr/handler/admin/PrepRecoveryOp.java  | 24 +++++++++-------
 .../solr/cloud/CollectionStateZnodeTest.java       |  1 +
 .../client/solrj/impl/BaseCloudSolrClient.java     |  2 +-
 11 files changed, 66 insertions(+), 41 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 9595c4f..809ea03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -152,17 +152,18 @@ public class LeaderElector implements Closeable {
     if (leaderSeqNodeName.equals(seqs.get(0))) {
       // I am the leader
       log.info("I am the potential leader {}, running leader process", context.leaderProps);
-      ParWork.getRootSharedExecutor().submit(() -> {
-        try {
-          if (isClosed || (zkController != null && zkController.getCoreContainer().isShutDown())) {
-            if (log.isDebugEnabled()) log.debug("Elector is closed, will not try and run leader processes");
-            return;
-          }
-          runIamLeaderProcess(context, replacement);
-        } catch (Exception e) {
-          log.error("", e);
+
+      try {
+        if (isClosed || (zkController != null && zkController.getCoreContainer().isShutDown())) {
+          if (log.isDebugEnabled()) log.debug("Elector is closed, will not try and run leader processes");
+          return false;
         }
-      });
+        runIamLeaderProcess(context, replacement);
+      } catch (AlreadyClosedException e) {
+        return false;
+      } catch (Exception e) {
+        log.error("", e);
+      }
 
     } else {
       log.info("I am not the leader - watch the node below me {}", context.getClass().getSimpleName());
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 844d4f1..d6ceeb3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -245,7 +245,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
   /**
    * This method may change in future and customisations are not supported between versions in terms of API or back
    * compat behaviour.
-   * 
+   *
    * @lucene.experimental
    */
   protected String getReplicateLeaderUrl(Replica leaderprops, ZkStateReader zkStateReader) {
@@ -333,7 +333,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   final private void commitOnLeader(String leaderUrl) throws SolrServerException,
       IOException {
-    log.info("send commit to leader");
+    log.info("send commit to leader {}", leaderUrl);
     Http2SolrClient client = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
     UpdateRequest ureq = new UpdateRequest();
     ureq.setBasePath(leaderUrl);
@@ -404,8 +404,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
         CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
         Replica leaderprops = zkStateReader.getLeaderRetry(
             cloudDesc.getCollectionName(), cloudDesc.getShardId());
-        final String leaderBaseUrl = leaderprops.getBaseUrl();
-        final String leaderCoreName = leaderprops.getName();
 
         String leaderUrl = leaderprops.getCoreUrl();
 
@@ -738,6 +736,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
         } catch (InterruptedException | AlreadyClosedException e) {
           ParWork.propagateInterrupt(e, true);
           return;
+        } catch (NullPointerException e) {
+          if (log.isDebugEnabled()) log.debug("NullPointerException", e);
+          break;
         } catch (Exception e) {
           SolrException.log(log, "Error while trying to recover", e);
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index f1e0269..9c7698d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -185,6 +185,8 @@ class ShardLeaderElectionContextBase extends ElectionContext {
       }
       //assert leaderZkNodeParentVersion != null;
 
+    } catch (NoNodeException e) {
+      throw new AlreadyClosedException("No node exists for election");
     } catch (Throwable t) {
       ParWork.propagateInterrupt(t);
       throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed: " + errors, t);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 2bf1349..10a86d4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -164,6 +164,10 @@ public class ZkController implements Closeable, Runnable {
     return dcCalled;
   }
 
+  public LeaderElector getShardLeaderElector(String name) {
+    return leaderElectors.get(name);
+  }
+
   static class ContextKey {
 
     private String collection;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index bb6a826..63b8ad7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -341,11 +341,14 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
                   return false;
                 }
                 for (String name : coresToCreate.keySet()) {
-                  if (c.getReplica(name) == null) {
+                  if (c.getReplica(name) == null || c.getReplica(name).getState() != Replica.State.ACTIVE) {
                     return false;
                   }
                 }
                 Collection<Slice> slices = c.getSlices();
+                if (slices.size() < shardNames.size()) {
+                  return false;
+                }
                 for (Slice slice : slices) {
                   if (slice.getLeader() == null) {
                     return false;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 2ac109e..305fb31 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -188,6 +188,8 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         if (finalShardHandler != null && finalShardRequestTracker != null) {
           try {
             finalShardRequestTracker.processResponses(results, finalShardHandler, false, null, okayExceptions);
+            // TODO: wait for delete collection?
+            zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (l, c) -> c == null);
           } catch (Exception e) {
             log.error("Exception waiting for results of delete collection cmd", e);
           }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index fb3a2d5..a2c4ce1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -254,21 +254,22 @@ public class ZkStateWriter {
 
     // wait to see our last publish version has propagated
     cs.forEachCollection(collection -> {
-
+      Integer v = null;
       try {
         //System.out.println("waiting to see state " + prevVersion);
-        Integer v = trackVersions.get(collection.getName());
-        if (v == null || v == 0) return;
-
+        v = trackVersions.get(collection.getName());
+        if (v == null) v = 0;
+        if (v == 0) return;
         Integer version = v;
         try {
           log.debug("wait to see last published version for collection {} {}", collection.getName(), v);
-          reader.waitForState(collection.getName(), 15, TimeUnit.SECONDS, (l, col) -> {
-
-            //              if (col != null) {
-            //                System.out.println("the version " + col.getZNodeVersion());
-            //              }
-
+          reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
+                      if (col == null) {
+                        return true;
+                      }
+//                          if (col != null) {
+//                            log.info("the version " + col.getZNodeVersion());
+//                          }
             if (col != null && col.getZNodeVersion() >= version) {
               if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
               // System.out.println("found the version");
@@ -281,8 +282,7 @@ public class ZkStateWriter {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
         }
       } catch (TimeoutException e) {
-        log.warn("Timeout waiting to see written cluster state come back");
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        log.warn("Timeout waiting to see written cluster state come back " + v);
       }
 
     });
@@ -307,12 +307,15 @@ public class ZkStateWriter {
           Integer v = trackVersions.get(collection.getName());
           if (v != null) {
             version = v;
-            trackVersions.put(collection.getName(), v + 1);
-          } else {
-            trackVersions.put(collection.getName(), version + 1);
           }
 
           reader.getZkClient().setData(path, data, version, true);
+
+          trackVersions.put(collection.getName(), version + 1);
+        } catch (KeeperException.NoNodeException e) {
+          if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
+          trackVersions.remove(collection.getName());
+          // likely deleted
         } catch (KeeperException.BadVersionException bve) {
           lastFailedException.set(bve);
           failedUpdates.put(collection.getName(), collection);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 379a2eb..f2a91e7 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1741,6 +1741,10 @@ public class CoreContainer implements Closeable {
 
     if (name != null) {
 
+      if (isZooKeeperAware()) {
+        IOUtils.closeQuietly(getZkController().getShardLeaderElector(name));
+      }
+
       // check for core-init errors first
       CoreLoadFailure loadFailure = coreInitFailures.remove(name);
       if (loadFailure != null) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 8dd7efc..ee31f8f 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -38,6 +38,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.admin.CoreAdminHandler.CallInfo;
 import org.apache.solr.util.TestInjection;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,17 +94,20 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
             state = replica.getState();
             live = n.contains(nodeName);
 
-            ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collection, c.getSlice(replica).getName());
-            // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
-            if (waitForState == Replica.State.RECOVERING && shardTerms.registered(cname)
-                && shardTerms.skipSendingUpdatesTo(cname)) {
-              // The replica changed its term, then published itself as RECOVERING.
-              // This core already see replica as RECOVERING
-              // so it is guarantees that a live-fetch will be enough for this core to see max term published
-              log.info("refresh shard terms for core {}", cname);
-              shardTerms.refreshTerms();
+            try {
+              ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collection, c.getSlice(replica).getName());
+              // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+              if (waitForState == Replica.State.RECOVERING && shardTerms.registered(cname) && shardTerms.skipSendingUpdatesTo(cname)) {
+                // The replica changed its term, then published itself as RECOVERING.
+                // This core already see replica as RECOVERING
+                // so it is guarantees that a live-fetch will be enough for this core to see max term published
+                log.info("refresh shard terms for core {}", cname);
+                shardTerms.refreshTerms();
+              }
+            } catch (NullPointerException e) {
+              if (log.isDebugEnabled()) log.debug("No shards found", e);
+              // likely deleted shard/collection
             }
-
             if (log.isInfoEnabled()) {
               log.info(
                   "In WaitForState(" + waitForState + "): collection=" + collection +
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
index 1636ae9..ea398af 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
@@ -25,6 +25,7 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+// nocommit - speed this up - waits for zkwriter to see its own state after delete
 public class CollectionStateZnodeTest extends SolrCloudTestCase {
 
   @BeforeClass
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index a1f6ff4..26f5f76 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -1089,7 +1089,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       Integer ver = (Integer) resp.get("csver");
       if (ver != null) {
         try {
-          log.info("Wait for catch up to server state");
+          log.info("Wait for catch up to server state {}", ver);
           getZkStateReader().waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
             if (collectionState != null && collectionState.getZNodeVersion() >= ver) {
               return true;