You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2016/06/07 21:02:05 UTC

lucene-solr:SOLR-9191: SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed (Scott Blum, Noble Paul)

Repository: lucene-solr
Updated Branches:
  refs/heads/SOLR-9191 [created] 1ddb82b9b


SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed (Scott Blum, Noble Paul)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1ddb82b9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1ddb82b9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1ddb82b9

Branch: refs/heads/SOLR-9191
Commit: 1ddb82b9be74b10135583c7f46a2e04f629bb04a
Parents: 6ffd1ba
Author: Scott Blum <dr...@gmail.com>
Authored: Tue Jun 7 01:52:16 2016 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Tue Jun 7 17:01:29 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../org/apache/solr/cloud/DistributedQueue.java | 80 ++++++++++++--------
 .../solr/cloud/OverseerTaskProcessor.java       |  4 +-
 .../apache/solr/cloud/OverseerTaskQueue.java    | 38 +++-------
 4 files changed, 61 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ddb82b9/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 56c111b..2e0b7b3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -255,6 +255,8 @@ Bug Fixes
 
 * SOLR-8676: keep LOG4J_CONFIG in solr.cmd (Kristine Jetzke via Mikhail Khludnev)
 
+* SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed (Scott Blum, Noble Paul)
+
 Optimizations
 ----------------------
 * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ddb82b9/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index e424b7e..b430ae5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -17,14 +17,16 @@
 package org.apache.solr.cloud;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -32,6 +34,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.util.Pair;
 import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -80,21 +83,15 @@ public class DistributedQueue {
   private TreeSet<String> knownChildren = new TreeSet<>();
 
   /**
-   * Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
-   * {@link #knownChildren} is empty before waiting on this condition.
+   * Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
    */
-  private final Condition notEmpty = updateLock.newCondition();
+  private final Condition changed = updateLock.newCondition();
 
   /**
-   * If non-null, the last watcher to listen for child changes.
+   * If non-null, the last watcher to listen for child changes.  If null, the in-memory contents are dirty.
    */
   private ChildWatcher lastWatcher = null;
 
-  /**
-   * If true, ZK's child list probably doesn't match what's in memory.
-   */
-  private boolean isDirty = true;
-
   public DistributedQueue(SolrZkClient zookeeper, String dir) {
     this(zookeeper, dir, new Overseer.Stats());
   }
@@ -165,7 +162,7 @@ public class DistributedQueue {
         if (result != null) {
           return result;
         }
-        waitNanos = notEmpty.awaitNanos(waitNanos);
+        waitNanos = changed.awaitNanos(waitNanos);
       }
       return null;
     } finally {
@@ -222,7 +219,7 @@ public class DistributedQueue {
         if (result != null) {
           return result;
         }
-        notEmpty.await();
+        changed.await();
       }
     } finally {
       updateLock.unlock();
@@ -273,25 +270,19 @@ public class DistributedQueue {
   private String firstChild(boolean remove) throws KeeperException, InterruptedException {
     updateLock.lockInterruptibly();
     try {
-      // Try to fetch the first in-memory child.
-      if (!knownChildren.isEmpty()) {
+      // If we're not in a dirty state, and we have in-memory children, return from in-memory.
+      if (lastWatcher != null && !knownChildren.isEmpty()) {
         return remove ? knownChildren.pollFirst() : knownChildren.first();
       }
 
-      if (lastWatcher != null && !isDirty) {
-        // No children, no known updates, and a watcher is already set; nothing we can do.
-        return null;
-      }
-
       // Try to fetch an updated list of children from ZK.
       ChildWatcher newWatcher = new ChildWatcher();
       knownChildren = fetchZkChildren(newWatcher);
       lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
-      isDirty = false;
       if (knownChildren.isEmpty()) {
         return null;
       }
-      notEmpty.signalAll();
+      changed.signalAll();
       return remove ? knownChildren.pollFirst() : knownChildren.first();
     } finally {
       updateLock.unlock();
@@ -325,26 +316,51 @@ public class DistributedQueue {
   }
 
   /**
-   * Return the currently-known set of children from memory. If there are no children,
-   * waits up to {@code waitMillis} for at least one child to become available. May
-   * update the set of known children.
+   * Return the currently-known set of elements, using child names from memory. If no children are found, or no
+   * children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
+   * <p>
+   * Package-private to support {@link OverseerTaskQueue} specifically.
    */
-  SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
+  Collection<Pair<String, byte[]>> peekElements(int n, long waitMillis, Function<String, Boolean> acceptFilter) throws KeeperException, InterruptedException {
+    List<Pair<String, byte[]>> result = new ArrayList<>();
     long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
     while (waitNanos > 0) {
       // Trigger a fetch if needed.
       firstElement();
+
+      TreeSet<String> childCopy;
       updateLock.lockInterruptibly();
       try {
-        if (!knownChildren.isEmpty()) {
-          return new TreeSet<>(knownChildren);
-        }
-        waitNanos = notEmpty.awaitNanos(waitNanos);
+        childCopy = new TreeSet<>(knownChildren);
       } finally {
         updateLock.unlock();
       }
+
+      for (String child : childCopy) {
+        if (acceptFilter.apply(child)) {
+          try {
+            byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
+            result.add(new Pair<>(child, data));
+            if (result.size() >= n) {
+              return result;
+            }
+          } catch (KeeperException.NoNodeException e) {
+            // Another client deleted the node first, remove the in-memory and retry.
+            updateLock.lockInterruptibly();
+            try {
+              knownChildren.remove(child);
+            } finally {
+              updateLock.unlock();
+            }
+          }
+        }
+      }
+      if (!result.isEmpty()) {
+        return result;
+      }
+      waitNanos = changed.awaitNanos(waitNanos);
     }
-    return Collections.emptySortedSet();
+    return Collections.emptyList();
   }
 
   /**
@@ -418,10 +434,8 @@ public class DistributedQueue {
         if (lastWatcher == this) {
           lastWatcher = null;
         }
-        // Do no updates in this thread, just signal state back to client threads.
-        isDirty = true;
         // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
-        notEmpty.signalAll();
+        changed.signalAll();
       } finally {
         updateLock.unlock();
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ddb82b9/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 93a7e6f..0c28c84 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -190,9 +190,9 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
             cleanUpWorkQueue();
 
           List<QueueEvent> heads = workQueue.peekTopN(MAX_PARALLEL_TASKS, runningZKTasks, 2000L);
-
-          if (heads == null)
+          if (heads.isEmpty()) {
             continue;
+          }
 
           log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ddb82b9/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 4cee814..ee9cf05 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -17,7 +17,6 @@
 package org.apache.solr.cloud;
 
 import java.lang.invoke.MethodHandles;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -25,6 +24,7 @@ import java.util.TreeSet;
 
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.Pair;
 import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -227,44 +227,26 @@ public class OverseerTaskQueue extends DistributedQueue {
     ArrayList<QueueEvent> topN = new ArrayList<>();
 
     LOG.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
-    TimerContext time = null;
+    TimerContext time;
     if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
     else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
 
     try {
-      for (String headNode : getChildren(waitMillis)) {
-        if (topN.size() < n) {
-          try {
-            String id = dir + "/" + headNode;
-            if (excludeSet.contains(id)) continue;
-            QueueEvent queueEvent = new QueueEvent(id,
-                zookeeper.getData(dir + "/" + headNode, null, null, true), null);
-            topN.add(queueEvent);
-          } catch (KeeperException.NoNodeException e) {
-            // Another client removed the node first, try next
-          }
-        } else {
-          if (topN.size() >= 1) {
-            printQueueEventsListElementIds(topN);
-            return topN;
-          }
-        }
-      }
-
-      if (topN.size() > 0 ) {
-        printQueueEventsListElementIds(topN);
-        return topN;
+      for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.contains(dir + "/" + child))) {
+        topN.add(new QueueEvent(dir + "/" + element.first(),
+            element.second(), null));
       }
-      return null;
+      printQueueEventsListElementIds(topN);
+      return topN;
     } finally {
       time.stop();
     }
   }
 
   private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
-    if(LOG.isDebugEnabled()) {
-      StringBuffer sb = new StringBuffer("[");
-      for(QueueEvent queueEvent: topN) {
+    if (LOG.isDebugEnabled() && !topN.isEmpty()) {
+      StringBuilder sb = new StringBuilder("[");
+      for (QueueEvent queueEvent : topN) {
         sb.append(queueEvent.getId()).append(", ");
       }
       sb.append("]");