You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/08/08 03:17:28 UTC

svn commit: r1616655 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/CHANGES.txt solr/core/ solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java

Author: markrmiller
Date: Fri Aug  8 01:17:27 2014
New Revision: 1616655

URL: http://svn.apache.org/r1616655
Log:
SOLR-6336: DistributedQueue can easily create too many ZooKeeper Watches. (closes #80)

Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1616655&r1=1616654&r2=1616655&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Fri Aug  8 01:17:27 2014
@@ -173,6 +173,9 @@ Bug Fixes
 * SOLR-6163: Correctly decode special characters in managed stopwords and synonym endpoints.
   (Vitaliy Zhovtyuk, Timo Schmidt via Timothy Potter)
 
+* SOLR-6336: DistributedQueue can easily create too many ZooKeeper Watches.
+  (Ramkumar Aiyengar via Mark Miller)
+
 Optimizations
 ---------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1616655&r1=1616654&r2=1616655&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Fri Aug  8 01:17:27 2014
@@ -270,6 +270,7 @@ public class DistributedQueue {
     
     public void await(long timeout) throws InterruptedException {
       synchronized (lock) {
+        if (this.event != null) return;
         lock.wait(timeout);
       }
     }
@@ -278,41 +279,60 @@ public class DistributedQueue {
       return event;
     }
   }
-  
+
+  // we avoid creating *many* watches in some cases
+  // by saving the childrenWatcher - see SOLR-6336
+  private volatile LatchChildWatcher childrenWatcher;
+  private TreeMap<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
+  {
+    LatchChildWatcher watcher = childrenWatcher;
+    TreeMap<Long,String> children = new TreeMap<> ();
+    if (watcher == null ||  watcher.getWatchedEvent() != null) {
+      watcher = new LatchChildWatcher();
+      while (true) {
+        try {
+          children = orderedChildren(watcher);
+          break;
+        } catch (KeeperException.NoNodeException e) {
+          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+          // go back to the loop and try again
+        }
+      }
+      childrenWatcher = watcher;
+    }
+
+    while (true) {
+      if (!children.isEmpty()) break;
+      watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
+      if (watcher.getWatchedEvent() != null) {
+        children = orderedChildren(null);
+      }
+      if (wait != Long.MAX_VALUE) break;
+    }
+    return children;
+  }
+
   /**
    * Removes the head of the queue and returns it, blocks until it succeeds.
    * 
    * @return The former head of the queue
    */
   public byte[] take() throws KeeperException, InterruptedException {
-    TreeMap<Long,String> orderedChildren;
     // Same as for element. Should refactor this.
     TimerContext timer = stats.time(dir + "_take");
     try {
-      while (true) {
-        LatchChildWatcher childWatcher = new LatchChildWatcher();
+      TreeMap<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
+      for (String headNode : orderedChildren.values()) {
+        String path = dir + "/" + headNode;
         try {
-          orderedChildren = orderedChildren(childWatcher);
+          byte[] data = zookeeper.getData(path, null, null, true);
+          zookeeper.delete(path, -1, true);
+          return data;
         } catch (KeeperException.NoNodeException e) {
-          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
-          continue;
-        }
-        if (orderedChildren.size() == 0) {
-          childWatcher.await(DEFAULT_TIMEOUT);
-          continue;
-        }
-
-        for (String headNode : orderedChildren.values()) {
-          String path = dir + "/" + headNode;
-          try {
-            byte[] data = zookeeper.getData(path, null, null, true);
-            zookeeper.delete(path, -1, true);
-            return data;
-          } catch (KeeperException.NoNodeException e) {
-            // Another client deleted the node first.
-          }
+          // Another client deleted the node first.
         }
       }
+      return null; // shouldn't really reach here..
     } finally {
       timer.stop();
     }
@@ -404,59 +424,36 @@ public class DistributedQueue {
     ArrayList<QueueEvent> topN = new ArrayList<>();
 
     LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
-    boolean waitedEnough = false;
     TimerContext time = null;
     if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
     else time = stats.time(dir + "_peekTopN_wait" + wait);
 
     try {
-      TreeMap<Long, String> orderedChildren;
-      while (true) {
-        LatchChildWatcher childWatcher = new LatchChildWatcher();
-        try {
-          orderedChildren = orderedChildren(childWatcher);
-        } catch (KeeperException.NoNodeException e) {
-          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
-          continue;
-        }
-
-        if (orderedChildren.size() == 0) {
-          if(waitedEnough) return null;
-          childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
-          waitedEnough = wait != Long.MAX_VALUE;
-          continue;
-        }
-
-        for (String headNode : orderedChildren.values()) {
-          if (headNode != null && topN.size() < n) {
-            try {
-              String id = dir + "/" + headNode;
-              if (excludeSet != null && excludeSet.contains(id)) continue;
-              QueueEvent queueEvent = new QueueEvent(id,
-                  zookeeper.getData(dir + "/" + headNode, null, null, true), null);
-              topN.add(queueEvent);
-            } catch (KeeperException.NoNodeException e) {
-              // Another client removed the node first, try next
-            }
-          } else {
-            if (topN.size() >= 1) {
-              printQueueEventsListElementIds(topN);
-              return topN;
-            }
+      TreeMap<Long, String> orderedChildren = getChildren(wait);
+      for (String headNode : orderedChildren.values()) {
+        if (headNode != null && topN.size() < n) {
+          try {
+            String id = dir + "/" + headNode;
+            if (excludeSet != null && excludeSet.contains(id)) continue;
+            QueueEvent queueEvent = new QueueEvent(id,
+                zookeeper.getData(dir + "/" + headNode, null, null, true), null);
+            topN.add(queueEvent);
+          } catch (KeeperException.NoNodeException e) {
+            // Another client removed the node first, try next
+          }
+        } else {
+          if (topN.size() >= 1) {
+            printQueueEventsListElementIds(topN);
+            return topN;
           }
         }
+      }
 
-        if (topN.size() > 0 ) {
-          printQueueEventsListElementIds(topN);
-          return topN;
-        }
-        if (waitedEnough) {
-          LOG.debug("Waited enough, returning null after peekTopN");
-          return null;
-        }
-        childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
-        waitedEnough = wait != Long.MAX_VALUE;
+      if (topN.size() > 0 ) {
+        printQueueEventsListElementIds(topN);
+        return topN;
       }
+      return null;
     } finally {
       time.stop();
     }
@@ -559,7 +556,7 @@ public class DistributedQueue {
   public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
     return peek(block ? Long.MAX_VALUE : 0);
   }
-  
+
   /**
    * Returns the data at the first element of the queue, or null if the queue is
    * empty after wait ms.
@@ -579,35 +576,17 @@ public class DistributedQueue {
         return element();
       }
 
-      TreeMap<Long, String> orderedChildren;
-      boolean waitedEnough = false;
-      while (true) {
-        LatchChildWatcher childWatcher = new LatchChildWatcher();
+      TreeMap<Long, String> orderedChildren = getChildren(wait);
+      for (String headNode : orderedChildren.values()) {
+        String path = dir + "/" + headNode;
         try {
-          orderedChildren = orderedChildren(childWatcher);
+          byte[] data = zookeeper.getData(path, null, null, true);
+          return new QueueEvent(path, data, null);
         } catch (KeeperException.NoNodeException e) {
-          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
-          continue;
-        }
-        if (waitedEnough) {
-          if (orderedChildren.isEmpty()) return null;
-        }
-        if (orderedChildren.size() == 0) {
-          childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
-          waitedEnough = wait != Long.MAX_VALUE;
-          continue;
-        }
-
-        for (String headNode : orderedChildren.values()) {
-          String path = dir + "/" + headNode;
-          try {
-            byte[] data = zookeeper.getData(path, null, null, true);
-            return new QueueEvent(path, data, childWatcher.getWatchedEvent());
-          } catch (KeeperException.NoNodeException e) {
-            // Another client deleted the node first.
-          }
+          // Another client deleted the node first.
         }
       }
+      return null;
     } finally {
       time.stop();
     }