You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/07 13:42:54 UTC

[13/32] lucene-solr:jira/solr-10996: SOLR-10983: Fix DOWNNODE -> queue-work explosion

SOLR-10983: Fix DOWNNODE -> queue-work explosion


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/380eed83
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/380eed83
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/380eed83

Branch: refs/heads/jira/solr-10996
Commit: 380eed838d6646ec02592a9d2e6649e6aa1b5d9b
Parents: d13e70f
Author: Scott Blum <dr...@gmail.com>
Authored: Wed Jun 28 21:38:41 2017 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Wed Jul 5 20:01:52 2017 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                     |  2 ++
 .../java/org/apache/solr/cloud/DistributedQueue.java |  7 ++-----
 .../src/java/org/apache/solr/cloud/Overseer.java     | 15 +++++++++++----
 3 files changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/380eed83/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5db3af3..85649b4 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -307,6 +307,8 @@ Bug Fixes
 * SOLR-10879: DELETEREPLICA and DELETENODE commands should prevent data loss when
   replicationFactor is 1. (ab)
 
+* SOLR-10983: Fix DOWNNODE -> queue-work explosion (Scott Blum, Joshua Humphries)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/380eed83/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 64120ed..cfd3144 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -228,11 +228,8 @@ public class DistributedQueue {
   }
 
   /**
-   * Inserts data into queue.  Successfully calling this method does NOT guarantee
-   * that the element will be immediately available in the in-memory queue. In particular,
-   * calling this method on an empty queue will not necessarily cause {@link #poll()} to
-   * return the offered element.  Use a blocking method if you must wait for the offered
-   * element to become visible.
+   * Inserts data into queue.  If there are no other queue consumers, the offered element
+   * will be immediately visible when this method returns.
    */
   public void offer(byte[] data) throws KeeperException, InterruptedException {
     Timer.Context time = stats.time(dir + "_offer");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/380eed83/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 7e1f8c4..05fce30 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -196,7 +196,9 @@ public class Overseer implements Closeable {
             log.error("Exception in Overseer main queue loop", e);
           }
           try {
+            boolean[] itemWasMoved = new boolean[1];
             while (head != null) {
+              itemWasMoved[0] = false;
               byte[] data = head;
               final ZkNodeProps message = ZkNodeProps.load(data);
               log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
@@ -204,7 +206,11 @@ public class Overseer implements Closeable {
               clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
                 @Override
                 public void onEnqueue() throws Exception {
-                  workQueue.offer(data);
+                  if (!itemWasMoved[0]) {
+                    stateUpdateQueue.poll();
+                    itemWasMoved[0] = true;
+                    workQueue.offer(data);
+                  }
                 }
 
                 @Override
@@ -214,9 +220,10 @@ public class Overseer implements Closeable {
                 }
               });
 
-              // it is safer to keep this poll here because an invalid message might never be queued
-              // and therefore we can't rely on the ZkWriteCallback to remove the item
-              stateUpdateQueue.poll();
+              // If the ZkWriteCallback never fired, just dump the item, it might be an invalid message.
+              if (!itemWasMoved[0]) {
+                stateUpdateQueue.poll();
+              }
 
               if (isClosed) break;
               // if an event comes in the next 100ms batch it together