You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/08/08 03:17:28 UTC
svn commit: r1616655 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/CHANGES.txt solr/core/
solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
Author: markrmiller
Date: Fri Aug 8 01:17:27 2014
New Revision: 1616655
URL: http://svn.apache.org/r1616655
Log:
SOLR-6336: DistributedQueue can easily create too many ZooKeeper Watches. (closes #80)
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1616655&r1=1616654&r2=1616655&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Fri Aug 8 01:17:27 2014
@@ -173,6 +173,9 @@ Bug Fixes
* SOLR-6163: Correctly decode special characters in managed stopwords and synonym endpoints.
(Vitaliy Zhovtyuk, Timo Schmidt via Timothy Potter)
+* SOLR-6336: DistributedQueue can easily create too many ZooKeeper Watches.
+ (Ramkumar Aiyengar via Mark Miller)
+
Optimizations
---------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1616655&r1=1616654&r2=1616655&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Fri Aug 8 01:17:27 2014
@@ -270,6 +270,7 @@ public class DistributedQueue {
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
+ if (this.event != null) return;
lock.wait(timeout);
}
}
@@ -278,41 +279,60 @@ public class DistributedQueue {
return event;
}
}
-
+
+ // we avoid creating *many* watches in some cases
+ // by saving the childrenWatcher - see SOLR-6336
+ private volatile LatchChildWatcher childrenWatcher;
+ private TreeMap<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
+ {
+ LatchChildWatcher watcher = childrenWatcher;
+ TreeMap<Long,String> children = new TreeMap<> ();
+ if (watcher == null || watcher.getWatchedEvent() != null) {
+ watcher = new LatchChildWatcher();
+ while (true) {
+ try {
+ children = orderedChildren(watcher);
+ break;
+ } catch (KeeperException.NoNodeException e) {
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+ // go back to the loop and try again
+ }
+ }
+ childrenWatcher = watcher;
+ }
+
+ while (true) {
+ if (!children.isEmpty()) break;
+ watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
+ if (watcher.getWatchedEvent() != null) {
+ children = orderedChildren(null);
+ }
+ if (wait != Long.MAX_VALUE) break;
+ }
+ return children;
+ }
+
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
*
* @return The former head of the queue
*/
public byte[] take() throws KeeperException, InterruptedException {
- TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this.
TimerContext timer = stats.time(dir + "_take");
try {
- while (true) {
- LatchChildWatcher childWatcher = new LatchChildWatcher();
+ TreeMap<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
+ for (String headNode : orderedChildren.values()) {
+ String path = dir + "/" + headNode;
try {
- orderedChildren = orderedChildren(childWatcher);
+ byte[] data = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ return data;
} catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
- continue;
- }
- if (orderedChildren.size() == 0) {
- childWatcher.await(DEFAULT_TIMEOUT);
- continue;
- }
-
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
- try {
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
- }
+ // Another client deleted the node first.
}
}
+ return null; // shouldn't really reach here..
} finally {
timer.stop();
}
@@ -404,59 +424,36 @@ public class DistributedQueue {
ArrayList<QueueEvent> topN = new ArrayList<>();
LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
- boolean waitedEnough = false;
TimerContext time = null;
if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
else time = stats.time(dir + "_peekTopN_wait" + wait);
try {
- TreeMap<Long, String> orderedChildren;
- while (true) {
- LatchChildWatcher childWatcher = new LatchChildWatcher();
- try {
- orderedChildren = orderedChildren(childWatcher);
- } catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
- continue;
- }
-
- if (orderedChildren.size() == 0) {
- if(waitedEnough) return null;
- childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
- waitedEnough = wait != Long.MAX_VALUE;
- continue;
- }
-
- for (String headNode : orderedChildren.values()) {
- if (headNode != null && topN.size() < n) {
- try {
- String id = dir + "/" + headNode;
- if (excludeSet != null && excludeSet.contains(id)) continue;
- QueueEvent queueEvent = new QueueEvent(id,
- zookeeper.getData(dir + "/" + headNode, null, null, true), null);
- topN.add(queueEvent);
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- } else {
- if (topN.size() >= 1) {
- printQueueEventsListElementIds(topN);
- return topN;
- }
+ TreeMap<Long, String> orderedChildren = getChildren(wait);
+ for (String headNode : orderedChildren.values()) {
+ if (headNode != null && topN.size() < n) {
+ try {
+ String id = dir + "/" + headNode;
+ if (excludeSet != null && excludeSet.contains(id)) continue;
+ QueueEvent queueEvent = new QueueEvent(id,
+ zookeeper.getData(dir + "/" + headNode, null, null, true), null);
+ topN.add(queueEvent);
+ } catch (KeeperException.NoNodeException e) {
+ // Another client removed the node first, try next
+ }
+ } else {
+ if (topN.size() >= 1) {
+ printQueueEventsListElementIds(topN);
+ return topN;
}
}
+ }
- if (topN.size() > 0 ) {
- printQueueEventsListElementIds(topN);
- return topN;
- }
- if (waitedEnough) {
- LOG.debug("Waited enough, returning null after peekTopN");
- return null;
- }
- childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
- waitedEnough = wait != Long.MAX_VALUE;
+ if (topN.size() > 0 ) {
+ printQueueEventsListElementIds(topN);
+ return topN;
}
+ return null;
} finally {
time.stop();
}
@@ -559,7 +556,7 @@ public class DistributedQueue {
public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
return peek(block ? Long.MAX_VALUE : 0);
}
-
+
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty after wait ms.
@@ -579,35 +576,17 @@ public class DistributedQueue {
return element();
}
- TreeMap<Long, String> orderedChildren;
- boolean waitedEnough = false;
- while (true) {
- LatchChildWatcher childWatcher = new LatchChildWatcher();
+ TreeMap<Long, String> orderedChildren = getChildren(wait);
+ for (String headNode : orderedChildren.values()) {
+ String path = dir + "/" + headNode;
try {
- orderedChildren = orderedChildren(childWatcher);
+ byte[] data = zookeeper.getData(path, null, null, true);
+ return new QueueEvent(path, data, null);
} catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
- continue;
- }
- if (waitedEnough) {
- if (orderedChildren.isEmpty()) return null;
- }
- if (orderedChildren.size() == 0) {
- childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
- waitedEnough = wait != Long.MAX_VALUE;
- continue;
- }
-
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
- try {
- byte[] data = zookeeper.getData(path, null, null, true);
- return new QueueEvent(path, data, childWatcher.getWatchedEvent());
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
- }
+ // Another client deleted the node first.
}
}
+ return null;
} finally {
time.stop();
}