You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/11 18:28:31 UTC

[lucene-solr] branch reference_impl updated (169fbe7 -> fbff836)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from 169fbe7  @1454 Need to be thread safe.
     new ec7a1da  @1455 Write the collection we are interested in.
     new fbff836  @1456 Cleanup current proper waiting.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/java/org/apache/solr/cloud/Overseer.java   |  87 ++++++--
 .../solr/cloud/OverseerTaskExecutorTask.java       |   6 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   2 +-
 .../solr/cloud/api/collections/CreateShardCmd.java |   2 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |   4 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |   2 +-
 .../solr/cloud/api/collections/MoveReplicaCmd.java |   5 +-
 .../OverseerCollectionMessageHandler.java          |   4 +-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java |   6 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 233 ++++++++++-----------
 .../CreateCollectionsIndexAndRestartTest.java      |   2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |   2 +-
 13 files changed, 193 insertions(+), 164 deletions(-)


[lucene-solr] 01/02: @1455 Write the collection we are interested in.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit ec7a1da7468307de43c51e0412575e4922c6d9cb
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 12:02:39 2021 -0600

    @1455 Write the collection we are interested in.
    
    Took 22 minutes
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  21 +-
 .../solr/cloud/OverseerTaskExecutorTask.java       |   6 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   2 +-
 .../solr/cloud/api/collections/CreateShardCmd.java |   2 +-
 .../cloud/api/collections/DeleteReplicaCmd.java    |   4 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |   2 +-
 .../solr/cloud/api/collections/MoveReplicaCmd.java |   5 +-
 .../OverseerCollectionMessageHandler.java          |   4 +-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java |   6 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 233 ++++++++++-----------
 11 files changed, 133 insertions(+), 154 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index dd665e2..f200fe7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -72,7 +72,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -712,8 +711,8 @@ public class Overseer implements SolrCloseable {
     return future;
   }
 
-  public Future writePendingUpdates() {
-    return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), zkStateWriter));
+  public Future writePendingUpdates(String collection) {
+    return ParWork.getRootSharedExecutor().submit(new OverseerTaskExecutorTask.WriteTask(getCoreContainer(), collection));
   }
 
   private static abstract class QueueWatcher implements Watcher, Closeable {
@@ -766,7 +765,7 @@ public class Overseer implements SolrCloseable {
         return;
       }
 
-      //ourLock.lock();
+      ourLock.lock();
       try {
         try {
           List<String> items = getItems();
@@ -779,7 +778,7 @@ public class Overseer implements SolrCloseable {
           log.error("Exception during overseer queue queue processing", e);
         }
       } finally {
-     //   ourLock.unlock();
+        ourLock.unlock();
       }
 
     }
@@ -820,7 +819,6 @@ public class Overseer implements SolrCloseable {
     @Override
     protected void processQueueItems(List<String> items, boolean onStart) {
       List<String> fullPaths = new ArrayList<>(items.size());
-      ourLock.lock();
       try {
         if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
         for (String item : items) {
@@ -856,9 +854,13 @@ public class Overseer implements SolrCloseable {
             log.error("failed waiting for enqueued updates", e);
           }
         }
-        overseer.writePendingUpdates();
 
+        Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
+        for (String collection : collections) {
+          overseer.writePendingUpdates(collection);
+        }
       } finally {
+
         if (overseer.zkStateWriter != null) {
           if (zkController.getZkClient().isAlive()) {
             try {
@@ -867,8 +869,8 @@ public class Overseer implements SolrCloseable {
               log.warn("Failed deleting processed items", e);
             }
           }
+
         }
-        ourLock.unlock();
       }
     }
 
@@ -909,7 +911,6 @@ public class Overseer implements SolrCloseable {
       @Override
       protected void processQueueItems(List<String> items, boolean onStart) {
         List<String> fullPaths = new ArrayList<>(items.size());
-        ourLock.lock();
         try {
           log.info("Found collection queue items {} onStart={}", items, onStart);
           for (String item : items) {
@@ -940,9 +941,7 @@ public class Overseer implements SolrCloseable {
           } catch (Exception e) {
             log.warn("Delete items failed {}", e.getMessage());
           }
-          ourLock.unlock();
         }
-
       }
 
       private void runAsync(Map.Entry<String,byte[]> entry, boolean onStart) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
index ac6a4cb..c118ac5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
@@ -82,16 +82,18 @@ public class OverseerTaskExecutorTask {
   }
 
   public static class WriteTask implements Runnable {
+    private final String collection;
     CoreContainer coreContainer;
 
-    public WriteTask(CoreContainer coreContainer, ZkStateWriter zkStateWriter) {
+    public WriteTask(CoreContainer coreContainer, String collection) {
+      this.collection = collection;
       this.coreContainer = coreContainer;
     }
 
     @Override
     public void run() {
       try {
-        coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates();
+        coreContainer.getZkController().getOverseer().getZkStateWriter().writePendingUpdates(collection);
       } catch (NullPointerException e) {
         if (log.isDebugEnabled()) log.debug("Won't write pending updates, zkStateWriter=null");
       } catch (Exception e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4783d7b..0ba35e8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       Future future = ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null, false);
       future.get();
-      writeFuture = ocmh.overseer.writePendingUpdates();
+      writeFuture = ocmh.overseer.writePendingUpdates(collectionName);
 
       if (log.isDebugEnabled()) log.debug("Sending create call for {} replicas for {}", coresToCreate.size(), collectionName);
       for (Map.Entry<String,ShardRequest> e : coresToCreate.entrySet()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index c9a2708..91851f9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -188,7 +188,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
         log.error("failure", e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
-      response.writeFuture = overseer.writePendingUpdates();
+      response.writeFuture = overseer.writePendingUpdates(collection);
       if (resp.asyncFinalRunner != null) {
         try {
           resp.asyncFinalRunner.call();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 4b8a170..29ea39c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -198,7 +198,7 @@ public class DeleteReplicaCmd implements Cmd {
           if (waitForFinalState) {
             try {
               ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(finalCollectionName1), null, false).get();
-              ocmh.overseer.writePendingUpdates();
+              ocmh.overseer.writePendingUpdates(finalCollectionName1);
               waitForCoreNodeGone(finalCollectionName1, shard, replicaName, 5000); // MRM TODO: timeout
             } catch (Exception e) {
               log.error("Failed waiting for replica to be removed", e);
@@ -280,7 +280,7 @@ public class DeleteReplicaCmd implements Cmd {
         for (Future future : futures) {
           future.get();
         }
-        ocmh.overseer.writePendingUpdates();
+        ocmh.overseer.writePendingUpdates(collectionName);
       } catch (Exception e) {
         log.error("failed writing update to zkstatewriter", e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 3a3be00..1ab2ebe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -178,7 +178,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
       if (waitForFinalState) {
         ocmh.overseer.getZkStateWriter().enqueueUpdate(finalClusterState.getCollection(collectionName), null, false).get();
-        ocmh.overseer.writePendingUpdates().get();
+        ocmh.overseer.writePendingUpdates(collectionName).get();
         ocmh.overseer.getZkStateReader().waitForState(collectionName, 10, TimeUnit.SECONDS, (liveNodes, coll) -> {
           if (coll == null) {
             return true;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index 8ae6612..e9f6aaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -308,8 +308,9 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
         @SuppressWarnings({"rawtypes"}) NamedList deleteResult = new NamedList();
         try {
           response1.clusterState = ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult).clusterState;
-          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(response1.clusterState.getCollectionsMap().keySet().iterator().next()), null,false).get();
-          asyncResp.writeFuture = ocmh.overseer.writePendingUpdates();
+          String collection = response1.clusterState.getCollectionsMap().keySet().iterator().next();
+          ocmh.overseer.getZkStateWriter().enqueueUpdate( response1.clusterState.getCollection(collection), null,false).get();
+          asyncResp.writeFuture = ocmh.overseer.writePendingUpdates(collection);
         } catch (SolrException e) {
           deleteResult.add("failure", e.toString());
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 3126c0e..faa10df 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -333,7 +333,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
             }
             if (asyncResp == null || asyncResp.writeFuture == null) {
               future.get();
-              writeFuture2 = overseer.writePendingUpdates();
+              writeFuture2 = overseer.writePendingUpdates(collection);
             }
 
           } else {
@@ -356,7 +356,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
               if (future != null) {
                 future.get();
               }
-              writeFuture = overseer.writePendingUpdates();
+              writeFuture = overseer.writePendingUpdates(collection);
             }
           }
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index a694708..2b0fdec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -145,9 +145,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       };
       runners.add(runner);
     }
-
-    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(clusterState.getCollectionStates().keySet().iterator().next()), null, false).get();
-    ocmh.overseer.writePendingUpdates();
+    String collection = clusterState.getCollectionStates().keySet().iterator().next();
+    ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collection), null, false).get();
+    ocmh.overseer.writePendingUpdates(collection);
 
     CollectionCmdResponse.Response response = new CollectionCmdResponse.Response();
     response.results = results;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index ca636a8..793ac18 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -350,7 +350,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
 
       ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState.getCollection(collectionName), null,false).get();
-      ocmh.overseer.writePendingUpdates();
+      ocmh.overseer.writePendingUpdates(collectionName);
 
       log.info("Clusterstate after adding new shard for split {}", clusterState);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 92123bf..c039414 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -67,7 +67,6 @@ public class ZkStateWriter {
 
   protected volatile Stats stats;
 
-  AtomicReference<Exception> lastFailedException = new AtomicReference<>();
   private final Map<String,Integer> trackVersions = new ConcurrentHashMap<>();
 
   private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
@@ -510,11 +509,11 @@ public class ZkStateWriter {
    */
 
   // if additional updates too large, publish structure change
-  public void writePendingUpdates() {
+  public void writePendingUpdates(String collection) {
 
     do {
       try {
-        write();
+        write(collection);
         break;
       } catch (KeeperException.BadVersionException e) {
 
@@ -527,156 +526,129 @@ public class ZkStateWriter {
 
   }
 
-  private void write() throws KeeperException.BadVersionException {
-    // writeLock.lock();
-    // try {
-    //   log.info("Get our write lock");
-
-    //ourLock.lock();
-    try {
-      //     log.info("Got our write lock");
-      if (log.isDebugEnabled()) {
-        log.debug("writePendingUpdates {}", cs);
-      }
+  private void write(String coll) throws KeeperException.BadVersionException {
 
+    if (log.isDebugEnabled()) {
+      log.debug("writePendingUpdates {}", cs);
+    }
 
+    AtomicInteger lastVersion = new AtomicInteger();
+    AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
 
-      //      if (failedUpdates.size() > 0) {
-      //        Exception lfe = lastFailedException.get();
-      //        log.warn("Some collection updates failed {} logging last exception", failedUpdates, lfe); // MRM TODO: expand
-      //        failedUpdates.clear();
-      //        lfe = null;
-      //        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lfe);
-      //      }
-      //      } finally {
-      //        ourLock.unlock();
-      //      }
-
-      // wait to see our last publish version has propagated TODO don't wait on collections not hosted on overseer?
-      // waitForStateWePublishedToComeBack();
-
-      //   ourLock.lock();
-      AtomicInteger lastVersion = new AtomicInteger();
-      AtomicReference<KeeperException.BadVersionException> badVersionException = new AtomicReference();
-      List<String> removeCollections = new ArrayList<>();
-      //log.info("writing out state, looking at collections count={} toWrite={} {} : {}", cs.getCollectionsMap().size(), collectionsToWrite.size(), cs.getCollectionsMap().keySet(), collectionsToWrite);
-      //try {
-      cs.values().forEach(collection -> {
-
-
-        if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
-        Integer version = null;
-        if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
-          log.info("process collection {}", collection);
-          ColState collState = collLocks.compute(collection.getName(), (s, reentrantLock) -> {
-            if (reentrantLock == null) {
-              ColState colState = new ColState();
-              return colState;
-            }
-            return reentrantLock;
-          });
+    DocCollection collection = cs.get(coll);
 
-          collState.collLock.lock();
-          try {
-            collState.throttle.minimumWaitBetweenActions();
-            collState.throttle.markAttemptingAction();
-            String name = collection.getName();
-            String path = ZkStateReader.getCollectionPath(collection.getName());
-            String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
-            // log.info("process collection {} path {}", collection.getName(), path);
-            Stat existsStat = null;
-            if (log.isTraceEnabled()) log.trace("process {}", collection);
-            try {
-              // log.info("get data for {}", name);
-              byte[] data = Utils.toJSON(singletonMap(name, collection));
-              //  log.info("got data for {} {}", name, data.length);
+    if (collection == null) {
+      return;
+    }
 
-              try {
+    if (log.isDebugEnabled()) log.debug("check collection {} {} {}", collection, dirtyStructure, dirtyState);
+    Integer version = null;
+    if (dirtyStructure.contains(collection.getName()) || dirtyState.contains(collection.getName())) {
+      log.info("process collection {}", collection);
+      ColState collState = collLocks.compute(collection.getName(), (s, reentrantLock) -> {
+        if (reentrantLock == null) {
+          ColState colState = new ColState();
+          return colState;
+        }
+        return reentrantLock;
+      });
 
-                if (dirtyStructure.contains(collection.getName())) {
-                  if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
+      collState.collLock.lock();
+      try {
+        collState.throttle.minimumWaitBetweenActions();
+        collState.throttle.markAttemptingAction();
+        String name = collection.getName();
+        String path = ZkStateReader.getCollectionPath(collection.getName());
+        String pathSCN = ZkStateReader.getCollectionSCNPath(collection.getName());
+        // log.info("process collection {} path {}", collection.getName(), path);
+        Stat existsStat = null;
+        if (log.isTraceEnabled()) log.trace("process {}", collection);
+        try {
+          // log.info("get data for {}", name);
+          byte[] data = Utils.toJSON(singletonMap(name, collection));
+          //  log.info("got data for {} {}", name, data.length);
 
-                  Integer v = trackVersions.get(collection.getName());
+          try {
 
-                  if (v != null) {
-                    //log.info("got version from cache {}", v);
-                    version = v;
-                  } else {
-                    version = 0;
-                  }
-                  lastVersion.set(version);
-                  if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
+            if (dirtyStructure.contains(collection.getName())) {
+              if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
 
-                  reader.getZkClient().setData(path, data, version, true, false);
-                  if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
-                  trackVersions.put(collection.getName(), version + 1);
+              Integer v = trackVersions.get(collection.getName());
 
-                  reader.getZkClient().setData(pathSCN, null, -1, true, false);
-                  dirtyStructure.remove(collection.getName());
+              if (v != null) {
+                //log.info("got version from cache {}", v);
+                version = v;
+              } else {
+                version = 0;
+              }
+              lastVersion.set(version);
+              if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
 
-                  ZkNodeProps updates = stateUpdates.get(collection.getName());
-                  if (updates != null) {
-                    updates.getProperties().clear();
-                  }
-                }
+              reader.getZkClient().setData(path, data, version, true, false);
+              if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), version + 1);
+              trackVersions.put(collection.getName(), version + 1);
 
-              } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
-
-                lastVersion.set(-1);
-                trackVersions.remove(collection.getName());
-                stateUpdates.remove(collection.getName());
-                cs.remove(collection);
-                // likely deleted
-
-              } catch (KeeperException.BadVersionException bve) {
-                log.info("Tried to update state.json ({}) with bad version", collection);
-                //lastFailedException.set(bve);
-                //failedUpdates.put(collection.getName(), collection);
-                // Stat estate = reader.getZkClient().exists(path, null);
-                trackVersions.remove(collection.getName());
-                Stat stat = reader.getZkClient().exists(path, null, false, false);
-                log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
-
-                if (!overseer.isClosed() && stat != null) {
-                  trackVersions.put(collection.getName(), stat.getVersion());
-                }
-                throw bve;
-              }
+              reader.getZkClient().setData(pathSCN, null, -1, true, false);
+              dirtyStructure.remove(collection.getName());
 
-              if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
-                ZkNodeProps updates = stateUpdates.get(collection.getName());
-                if (updates != null) {
-                  writeStateUpdates(collection, updates);
-                }
+              ZkNodeProps updates = stateUpdates.get(collection.getName());
+              if (updates != null) {
+                updates.getProperties().clear();
               }
+            }
 
-            } catch (KeeperException.BadVersionException bve) {
-              badVersionException.set(bve);
-            } catch (InterruptedException | AlreadyClosedException e) {
-              log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
-
-            } catch (Exception e) {
-              log.error("Failed processing update=" + collection, e);
+          } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
+
+            lastVersion.set(-1);
+            trackVersions.remove(collection.getName());
+            stateUpdates.remove(collection.getName());
+            cs.remove(collection);
+            // likely deleted
+
+          } catch (KeeperException.BadVersionException bve) {
+            log.info("Tried to update state.json ({}) with bad version", collection);
+            //lastFailedException.set(bve);
+            //failedUpdates.put(collection.getName(), collection);
+            // Stat estate = reader.getZkClient().exists(path, null);
+            trackVersions.remove(collection.getName());
+            Stat stat = reader.getZkClient().exists(path, null, false, false);
+            log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, version, stat != null ? stat.getVersion() : "null");
+
+            if (!overseer.isClosed() && stat != null) {
+              trackVersions.put(collection.getName(), stat.getVersion());
             }
-          } finally {
-            collState.collLock.unlock();
+            throw bve;
           }
-        }
 
-      });
+          if (dirtyState.contains(collection.getName())) { //&& !dirtyStructure.contains(collection.getName())
+            ZkNodeProps updates = stateUpdates.get(collection.getName());
+            if (updates != null) {
+              writeStateUpdates(collection, updates);
+            }
+          }
 
-      //removeCollections.forEach(c ->  removeCollection(c));
+        } catch (KeeperException.BadVersionException bve) {
+          badVersionException.set(bve);
+        } catch (InterruptedException | AlreadyClosedException e) {
+          log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
 
-      if (badVersionException.get() != null) {
-        throw badVersionException.get();
+        } catch (Exception e) {
+          log.error("Failed processing update=" + collection, e);
+        }
+      } finally {
+        collState.collLock.unlock();
       }
+    }
 
-      //log.info("Done with successful cluster write out");
-    } finally {
+    //removeCollections.forEach(c ->  removeCollection(c));
 
+    if (badVersionException.get() != null) {
+      throw badVersionException.get();
     }
 
+    //log.info("Done with successful cluster write out");
+
     //    } finally {
     //      writeLock.unlock();
     //    }
@@ -703,6 +675,11 @@ public class ZkStateWriter {
     return ClusterState.getRefCS(cs, -2);
   }
 
+  public Set<String> getDirtyStateCollections() {
+    return dirtyState;
+  }
+
+
   public void removeCollection(String collection) {
     log.info("Removing collection from zk state {}", collection);
     ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {


[lucene-solr] 02/02: @1456 Cleanup current proper waiting.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit fbff83681a0baa8c4edf51a61af6c9f66e92addd
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 12:28:06 2021 -0600

    @1456 Cleanup current proper waiting.
    
    Took 25 minutes
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 76 ++++++++++++++++++----
 .../CreateCollectionsIndexAndRestartTest.java      |  2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |  2 +-
 3 files changed, 65 insertions(+), 15 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index f200fe7..80bcf42 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -810,15 +810,24 @@ public class Overseer implements SolrCloseable {
 
 
     public void start() throws KeeperException, InterruptedException {
-      zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
-      startItems = super.getItems();
-      log.info("Overseer found entries on start {}", startItems);
-      processQueueItems(startItems, true);
+      if (closed) return;
+      ourLock.lock();
+      try {
+        if (closed) return;
+        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+        startItems = super.getItems();
+        log.info("Overseer found entries on start {}", startItems);
+        processQueueItems(startItems, true);
+      } finally {
+        ourLock.unlock();
+      }
     }
 
     @Override
     protected void processQueueItems(List<String> items, boolean onStart) {
+      if (closed) return;
       List<String> fullPaths = new ArrayList<>(items.size());
+      ourLock.lock();
       try {
         if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
         for (String item : items) {
@@ -826,12 +835,13 @@ public class Overseer implements SolrCloseable {
         }
 
         Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+        Set<String> shardStateCollections = null;
         List<Future> futures = new ArrayList<>();
         for (byte[] item : data.values()) {
           final ZkNodeProps message = ZkNodeProps.load(item);
           try {
+            String operation = message.getStr(Overseer.QUEUE_OPERATION);
             if (onStart) {
-              String operation = message.getStr(Overseer.QUEUE_OPERATION);
               if (operation.equals("state")) {
                 message.getProperties().remove(OverseerAction.DOWNNODE);
                 if (message.getProperties().size() == 1) {
@@ -839,6 +849,15 @@ public class Overseer implements SolrCloseable {
                 }
               }
             }
+
+            // hack
+            if (operation.equals("updateshardstate")) {
+              if (shardStateCollections == null) {
+                shardStateCollections = new HashSet<>();
+              }
+              shardStateCollections.add(message.getStr("collection"));
+            }
+
             Future future = overseer.processQueueItem(message);
             if (future != null) {
               futures.add(future);
@@ -854,10 +873,31 @@ public class Overseer implements SolrCloseable {
             log.error("failed waiting for enqueued updates", e);
           }
         }
-
+        futures.clear();
         Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
         for (String collection : collections) {
-          overseer.writePendingUpdates(collection);
+          futures.add(overseer.writePendingUpdates(collection));
+        }
+
+        for (Future future : futures) {
+          try {
+            future.get();
+          } catch (Exception e) {
+            log.error("failed waiting for enqueued updates", e);
+          }
+        }
+        futures.clear();
+        if (shardStateCollections != null) {
+          for (String collection : shardStateCollections) {
+            futures.add(overseer.writePendingUpdates(collection));
+          }
+        }
+        for (Future future : futures) {
+          try {
+            future.get();
+          } catch (Exception e) {
+            log.error("failed waiting for enqueued updates", e);
+          }
         }
       } finally {
 
@@ -869,7 +909,7 @@ public class Overseer implements SolrCloseable {
               log.warn("Failed deleting processed items", e);
             }
           }
-
+          ourLock.unlock();
         }
       }
     }
@@ -893,23 +933,32 @@ public class Overseer implements SolrCloseable {
 
       @Override
       public void close() {
+        super.close();
         IOUtils.closeQuietly(collMessageHandler);
         IOUtils.closeQuietly(configMessageHandler);
-        super.close();
       }
 
       @Override
       public void start() throws KeeperException, InterruptedException {
-        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+        if (closed) return;
+        ourLock.lock();
+        try {
+          if (closed) return;
+          zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
 
-        startItems = super.getItems();
+          startItems = super.getItems();
 
-        log.info("Overseer found entries on start {}", startItems);
-        processQueueItems(startItems, true);
+          log.info("Overseer found entries on start {}", startItems);
+          processQueueItems(startItems, true);
+        } finally {
+          ourLock.unlock();
+        }
       }
 
       @Override
       protected void processQueueItems(List<String> items, boolean onStart) {
+        if (closed) return;
+        ourLock.lock();
         List<String> fullPaths = new ArrayList<>(items.size());
         try {
           log.info("Found collection queue items {} onStart={}", items, onStart);
@@ -941,6 +990,7 @@ public class Overseer implements SolrCloseable {
           } catch (Exception e) {
             log.warn("Delete items failed {}", e.getMessage());
           }
+          ourLock.unlock();
         }
       }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index 0ba3acf..3e70f81 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -56,7 +56,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
 
   @Test
   public void start() throws Exception {
-    int collectionCnt = 40;
+    int collectionCnt = 2;
     List<Future> futures = new ArrayList<>();
     List<Future> indexFutures = new ArrayList<>();
     for (int i = 0; i < collectionCnt; i ++) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 65448e4..a278b0b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1855,7 +1855,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             }
 
             Replica replica = docCollection.getReplicaById(id);
-            if (log.isDebugEnabled()) log.debug("Got additional state update replica={} id={} ids={} {} {}", replica, id, docCollection.getReplicaByIds(), state == null ? "leader" : state);
+            if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
 
             if (replica != null) {