You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/11 18:28:33 UTC

[lucene-solr] 02/02: @1456 Cleanup current proper waiting.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit fbff83681a0baa8c4edf51a61af6c9f66e92addd
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 12:28:06 2021 -0600

    @1456 Cleanup current proper waiting.
    
    Took 25 minutes
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 76 ++++++++++++++++++----
 .../CreateCollectionsIndexAndRestartTest.java      |  2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |  2 +-
 3 files changed, 65 insertions(+), 15 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index f200fe7..80bcf42 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -810,15 +810,24 @@ public class Overseer implements SolrCloseable {
 
 
     public void start() throws KeeperException, InterruptedException {
-      zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
-      startItems = super.getItems();
-      log.info("Overseer found entries on start {}", startItems);
-      processQueueItems(startItems, true);
+      if (closed) return;
+      ourLock.lock();
+      try {
+        if (closed) return;
+        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+        startItems = super.getItems();
+        log.info("Overseer found entries on start {}", startItems);
+        processQueueItems(startItems, true);
+      } finally {
+        ourLock.unlock();
+      }
     }
 
     @Override
     protected void processQueueItems(List<String> items, boolean onStart) {
+      if (closed) return;
       List<String> fullPaths = new ArrayList<>(items.size());
+      ourLock.lock();
       try {
         if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
         for (String item : items) {
@@ -826,12 +835,13 @@ public class Overseer implements SolrCloseable {
         }
 
         Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+        Set<String> shardStateCollections = null;
         List<Future> futures = new ArrayList<>();
         for (byte[] item : data.values()) {
           final ZkNodeProps message = ZkNodeProps.load(item);
           try {
+            String operation = message.getStr(Overseer.QUEUE_OPERATION);
             if (onStart) {
-              String operation = message.getStr(Overseer.QUEUE_OPERATION);
               if (operation.equals("state")) {
                 message.getProperties().remove(OverseerAction.DOWNNODE);
                 if (message.getProperties().size() == 1) {
@@ -839,6 +849,15 @@ public class Overseer implements SolrCloseable {
                 }
               }
             }
+
+            // hack
+            if (operation.equals("updateshardstate")) {
+              if (shardStateCollections == null) {
+                shardStateCollections = new HashSet<>();
+              }
+              shardStateCollections.add(message.getStr("collection"));
+            }
+
             Future future = overseer.processQueueItem(message);
             if (future != null) {
               futures.add(future);
@@ -854,10 +873,31 @@ public class Overseer implements SolrCloseable {
             log.error("failed waiting for enqueued updates", e);
           }
         }
-
+        futures.clear();
         Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
         for (String collection : collections) {
-          overseer.writePendingUpdates(collection);
+          futures.add(overseer.writePendingUpdates(collection));
+        }
+
+        for (Future future : futures) {
+          try {
+            future.get();
+          } catch (Exception e) {
+            log.error("failed waiting for enqueued updates", e);
+          }
+        }
+        futures.clear();
+        if (shardStateCollections != null) {
+          for (String collection : shardStateCollections) {
+            futures.add(overseer.writePendingUpdates(collection));
+          }
+        }
+        for (Future future : futures) {
+          try {
+            future.get();
+          } catch (Exception e) {
+            log.error("failed waiting for enqueued updates", e);
+          }
         }
       } finally {
 
@@ -869,7 +909,7 @@ public class Overseer implements SolrCloseable {
               log.warn("Failed deleting processed items", e);
             }
           }
-
+          ourLock.unlock();
         }
       }
     }
@@ -893,23 +933,32 @@ public class Overseer implements SolrCloseable {
 
       @Override
       public void close() {
+        super.close();
         IOUtils.closeQuietly(collMessageHandler);
         IOUtils.closeQuietly(configMessageHandler);
-        super.close();
       }
 
       @Override
       public void start() throws KeeperException, InterruptedException {
-        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+        if (closed) return;
+        ourLock.lock();
+        try {
+          if (closed) return;
+          zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
 
-        startItems = super.getItems();
+          startItems = super.getItems();
 
-        log.info("Overseer found entries on start {}", startItems);
-        processQueueItems(startItems, true);
+          log.info("Overseer found entries on start {}", startItems);
+          processQueueItems(startItems, true);
+        } finally {
+          ourLock.unlock();
+        }
       }
 
       @Override
       protected void processQueueItems(List<String> items, boolean onStart) {
+        if (closed) return;
+        ourLock.lock();
         List<String> fullPaths = new ArrayList<>(items.size());
         try {
           log.info("Found collection queue items {} onStart={}", items, onStart);
@@ -941,6 +990,7 @@ public class Overseer implements SolrCloseable {
           } catch (Exception e) {
             log.warn("Delete items failed {}", e.getMessage());
           }
+          ourLock.unlock();
         }
       }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index 0ba3acf..3e70f81 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -56,7 +56,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
 
   @Test
   public void start() throws Exception {
-    int collectionCnt = 40;
+    int collectionCnt = 2;
     List<Future> futures = new ArrayList<>();
     List<Future> indexFutures = new ArrayList<>();
     for (int i = 0; i < collectionCnt; i ++) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 65448e4..a278b0b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1855,7 +1855,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             }
 
             Replica replica = docCollection.getReplicaById(id);
-            if (log.isDebugEnabled()) log.debug("Got additional state update replica={} id={} ids={} {} {}", replica, id, docCollection.getReplicaByIds(), state == null ? "leader" : state);
+            if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
 
             if (replica != null) {