You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/17 15:22:39 UTC
[lucene-solr] 05/18: @840 Change overseer queue polling closer to
desired.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 6d9c9cc785dbe04a35bee4ecba5488bbcc39c47e
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 16 18:39:40 2020 -0500
@840 Change overseer queue polling closer to desired.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 20 ++++++++++++--------
.../OverseerCollectionConfigSetProcessorTest.java | 2 --
2 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 0e9b89d..9b75be1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -307,16 +307,14 @@ public class Overseer implements SolrCloseable {
// } else {
// wait = 0;
// }
- queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> true));
- } catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e, true);
+ queue = new LinkedList<>(stateUpdateQueue.peekElements(10000, wait, (x) -> true));
+ } catch (AlreadyClosedException e) {
return;
} catch (KeeperException.SessionExpiredException e) {
- log.error("run()", e);
-
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
return;
} catch (Exception e) {
+ ParWork.propagateInterrupt("Unexpected error in Overseer state update loop", e, true);
log.error("Unexpected error in Overseer state update loop", e);
if (!isClosed()) {
continue;
@@ -326,11 +324,13 @@ public class Overseer implements SolrCloseable {
}
try {
Set<String> processedNodes = new HashSet<>();
+ int loopCnt = 0;
while (queue != null && !queue.isEmpty()) {
- if (Thread.currentThread().isInterrupted() || isClosed) {
+ if (isClosed()) {
log.info("Closing");
return;
}
+
for (Pair<String, byte[]> head : queue) {
byte[] data = head.second();
final ZkNodeProps message = ZkNodeProps.load(data);
@@ -352,7 +352,11 @@ public class Overseer implements SolrCloseable {
// } else {
// wait = 0;
// }
- queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, node -> !processedNodes.contains(node)));
+ queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> !processedNodes.contains(node)));
+ if (loopCnt >= 1) {
+ break;
+ }
+ loopCnt++;
}
fallbackQueueSize = processedNodes.size();
// we should force write all pending updates because the next iteration might sleep until there
@@ -920,7 +924,7 @@ public class Overseer implements SolrCloseable {
@Override
public boolean isClosed() {
- return closed || zkController.getCoreContainer().isShutDown();
+ return closed;
}
void doClose() {
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index d78dfa1..01a6e5a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -446,8 +446,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
stateUpdateQueueMock.offer(invocation.getArgument(0));
} catch (KeeperException e) {
throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
}
return null;
}}).when(overseerMock).offerStateUpdate(any());