You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2017/12/08 19:21:07 UTC
lucene-solr:branch_7x: SOLR-11423: Overseer queue needs a hard cap
(maximum size) that clients respect
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x 1d08376b1 -> 311105f1b
SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/311105f1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/311105f1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/311105f1
Branch: refs/heads/branch_7x
Commit: 311105f1b0dad7f20ff6cd55be1d2eb9cd4246d6
Parents: 1d08376
Author: Scott Blum <dr...@apache.org>
Authored: Mon Oct 2 16:50:57 2017 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Fri Dec 8 13:33:42 2017 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 2 ++
.../java/org/apache/solr/cloud/Overseer.java | 2 +-
.../apache/solr/cloud/ZkDistributedQueue.java | 28 ++++++++++++++++++++
3 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/311105f1/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b253cc9..28f962a 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -144,6 +144,8 @@ New Features
Bug Fixes
----------------------
+* SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect (Scott Blum, Joshua Humphries, Noble Paul)
+
* SOLR-11445: Overseer should not hang when process bad message. (Cao Manh Dat, shalin)
* SOLR-11447: ZkStateWriter should process commands in atomic. (Cao Manh Dat, shalin)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/311105f1/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index f5db42a..3b65d6f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -642,7 +642,7 @@ public class Overseer implements SolrCloseable {
*/
static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
- return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats);
+ return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE);
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/311105f1/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 7255ccb..3a7c750 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
@@ -44,6 +45,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +99,13 @@ public class ZkDistributedQueue implements DistributedQueue {
private int watcherCount = 0;
+ private final int maxQueueSize;
+
+ /**
+ * If {@link #maxQueueSize} is set, the number of items we can queue without rechecking the server.
+ */
+ private final AtomicInteger offerPermits = new AtomicInteger(0);
+
public ZkDistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Stats());
}
@@ -120,6 +129,7 @@ public class ZkDistributedQueue implements DistributedQueue {
this.zookeeper = zookeeper;
this.stats = stats;
+ this.maxQueueSize = maxQueueSize;
}
/**
@@ -288,6 +298,24 @@ public class ZkDistributedQueue implements DistributedQueue {
try {
while (true) {
try {
+ if (maxQueueSize > 0) {
+ if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) {
+ // If a max queue size is set, check it before creating a new queue item.
+ Stat stat = zookeeper.exists(dir, null, true);
+ if (stat == null) {
+ // jump to the code below, which tries to create dir if it doesn't exist
+ throw new KeeperException.NoNodeException();
+ }
+ int remainingCapacity = maxQueueSize - stat.getNumChildren();
+ if (remainingCapacity <= 0) {
+ throw new IllegalStateException("queue is full");
+ }
+
+ // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
+ offerPermits.set(remainingCapacity / 100);
+ }
+ }
+
// Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
// This will get set again when the watcher actually fires, but that's ok.
zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);