You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/17 15:22:39 UTC

[lucene-solr] 05/18: @840 Change overseer queue polling closer to desired.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 6d9c9cc785dbe04a35bee4ecba5488bbcc39c47e
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 16 18:39:40 2020 -0500

    @840 Change overseer queue polling closer to desired.
---
 .../src/java/org/apache/solr/cloud/Overseer.java     | 20 ++++++++++++--------
 .../OverseerCollectionConfigSetProcessorTest.java    |  2 --
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 0e9b89d..9b75be1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -307,16 +307,14 @@ public class Overseer implements SolrCloseable {
 //            } else {
 //              wait = 0;
 //            }
-            queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> true));
-          } catch (InterruptedException | AlreadyClosedException e) {
-            ParWork.propagateInterrupt(e, true);
+            queue = new LinkedList<>(stateUpdateQueue.peekElements(10000, wait, (x) -> true));
+          } catch (AlreadyClosedException e) {
             return;
           } catch (KeeperException.SessionExpiredException e) {
-            log.error("run()", e);
-
             log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
             return;
           } catch (Exception e) {
+            ParWork.propagateInterrupt("Unexpected error in Overseer state update loop", e, true);
             log.error("Unexpected error in Overseer state update loop", e);
             if (!isClosed()) {
               continue;
@@ -326,11 +324,13 @@ public class Overseer implements SolrCloseable {
           }
           try {
             Set<String> processedNodes = new HashSet<>();
+            int loopCnt = 0;
             while (queue != null && !queue.isEmpty()) {
-              if (Thread.currentThread().isInterrupted() || isClosed) {
+              if (isClosed()) {
                 log.info("Closing");
                 return;
               }
+
               for (Pair<String, byte[]> head : queue) {
                 byte[] data = head.second();
                 final ZkNodeProps message = ZkNodeProps.load(data);
@@ -352,7 +352,11 @@ public class Overseer implements SolrCloseable {
 //              } else {
 //                wait = 0;
 //              }
-              queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, node -> !processedNodes.contains(node)));
+              queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> !processedNodes.contains(node)));
+              if (loopCnt >= 1) {
+                break;
+              }
+              loopCnt++;
             }
             fallbackQueueSize = processedNodes.size();
             // we should force write all pending updates because the next iteration might sleep until there
@@ -920,7 +924,7 @@ public class Overseer implements SolrCloseable {
 
   @Override
   public boolean isClosed() {
-    return closed || zkController.getCoreContainer().isShutDown();
+    return closed;
   }
 
   void doClose() {
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index d78dfa1..01a6e5a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -446,8 +446,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
               stateUpdateQueueMock.offer(invocation.getArgument(0));
             } catch (KeeperException e) {
               throw new RuntimeException(e);
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
             }
             return null;
           }}).when(overseerMock).offerStateUpdate(any());