You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2017/12/08 19:21:07 UTC

lucene-solr:branch_7x: SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 1d08376b1 -> 311105f1b


SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/311105f1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/311105f1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/311105f1

Branch: refs/heads/branch_7x
Commit: 311105f1b0dad7f20ff6cd55be1d2eb9cd4246d6
Parents: 1d08376
Author: Scott Blum <dr...@apache.org>
Authored: Mon Oct 2 16:50:57 2017 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Fri Dec 8 13:33:42 2017 -0500

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 ++
 .../java/org/apache/solr/cloud/Overseer.java    |  2 +-
 .../apache/solr/cloud/ZkDistributedQueue.java   | 28 ++++++++++++++++++++
 3 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/311105f1/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b253cc9..28f962a 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -144,6 +144,8 @@ New Features
 Bug Fixes
 ----------------------
 
+* SOLR-11423: Overseer queue needs a hard cap (maximum size) that clients respect (Scott Blum, Joshua Humphries, Noble Paul)
+
 * SOLR-11445: Overseer should not hang when process bad message. (Cao Manh Dat, shalin) 
 
 * SOLR-11447: ZkStateWriter should process commands in atomic. (Cao Manh Dat, shalin)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/311105f1/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index f5db42a..3b65d6f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -642,7 +642,7 @@ public class Overseer implements SolrCloseable {
    */
   static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
     createOverseerNode(zkClient);
-    return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats);
+    return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/311105f1/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 7255ccb..3a7c750 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
@@ -44,6 +45,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,6 +99,13 @@ public class ZkDistributedQueue implements DistributedQueue {
 
   private int watcherCount = 0;
 
+  private final int maxQueueSize;
+
+  /**
+   * If {@link #maxQueueSize} is set, the number of items we can queue without rechecking the server.
+   */
+  private final AtomicInteger offerPermits = new AtomicInteger(0);
+
   public ZkDistributedQueue(SolrZkClient zookeeper, String dir) {
     this(zookeeper, dir, new Stats());
   }
@@ -120,6 +129,7 @@ public class ZkDistributedQueue implements DistributedQueue {
 
     this.zookeeper = zookeeper;
     this.stats = stats;
+    this.maxQueueSize = maxQueueSize;
   }
 
   /**
@@ -288,6 +298,24 @@ public class ZkDistributedQueue implements DistributedQueue {
     try {
       while (true) {
         try {
+          if (maxQueueSize > 0) {
+            if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) {
+              // If a max queue size is set, check it before creating a new queue item.
+              Stat stat = zookeeper.exists(dir, null, true);
+              if (stat == null) {
+                // jump to the code below, which tries to create dir if it doesn't exist
+                throw new KeeperException.NoNodeException();
+              }
+              int remainingCapacity = maxQueueSize - stat.getNumChildren();
+              if (remainingCapacity <= 0) {
+                throw new IllegalStateException("queue is full");
+              }
+
+              // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
+              offerPermits.set(remainingCapacity / 100);
+            }
+          }
+
           // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
           // This will get set again when the watcher actually fires, but that's ok.
           zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);