You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2017/04/18 19:38:16 UTC
lucene-solr:branch_5x: SOLR-10420: fix watcher leak in
DistributedQueue
Repository: lucene-solr
Updated Branches:
refs/heads/branch_5x dee7d7749 -> 42d08dd28
SOLR-10420: fix watcher leak in DistributedQueue
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/42d08dd2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/42d08dd2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/42d08dd2
Branch: refs/heads/branch_5x
Commit: 42d08dd28c6609a2c70a691e6a88725c9aa31377
Parents: dee7d77
Author: Scott Blum <dr...@gmail.com>
Authored: Mon Apr 17 18:27:12 2017 -0400
Committer: Scott Blum <dr...@gmail.com>
Committed: Tue Apr 18 15:05:52 2017 -0400
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/DistributedQueue.java | 56 +++++++++++++-------
.../apache/solr/cloud/DistributedQueueTest.java | 50 ++++++++++++++++-
3 files changed, 86 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/42d08dd2/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 4fff68f..e666e5a 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -22,6 +22,8 @@ New Features
Bug Fixes
----------------------
+* SOLR-10420: Solr 6.x leaking one SolrZkClient instance per second (Scott Blum, Cao Manh Dat, Markus Jelsma, Steve Rowe)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/42d08dd2/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 7576ae5..dfc8784 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -86,10 +86,9 @@ public class DistributedQueue {
*/
private final Condition changed = updateLock.newCondition();
- /**
- * If non-null, the last watcher to listen for child changes. If null, the in-memory contents are dirty.
- */
- private ChildWatcher lastWatcher = null;
+ private boolean isDirty = true;
+
+ private int watcherCount = 0;
public DistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
@@ -238,10 +237,10 @@ public class DistributedQueue {
try {
while (true) {
try {
- // We don't need to explicitly set isDirty here; if there is a watcher, it will
- // see the update and set the bit itself; if there is no watcher we can defer
- // the update anyway.
+ // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
+ // This will get set again when the watcher actually fires, but that's ok.
zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+ isDirty = true;
return;
} catch (KeeperException.NoNodeException e) {
try {
@@ -269,15 +268,25 @@ public class DistributedQueue {
private String firstChild(boolean remove) throws KeeperException, InterruptedException {
updateLock.lockInterruptibly();
try {
- // If we're not in a dirty state, and we have in-memory children, return from in-memory.
- if (lastWatcher != null && !knownChildren.isEmpty()) {
- return remove ? knownChildren.pollFirst() : knownChildren.first();
+ if (!isDirty) {
+ // If we're not in a dirty state...
+ if (!knownChildren.isEmpty()) {
+ // and we have in-memory children, return from in-memory.
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
+ } else {
+ // otherwise there's nothing to return
+ return null;
+ }
}
- // Try to fetch an updated list of children from ZK.
- ChildWatcher newWatcher = new ChildWatcher();
+ // Dirty, try to fetch an updated list of children from ZK.
+ // Only set a new watcher if there isn't already a watcher.
+ ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
knownChildren = fetchZkChildren(newWatcher);
- lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
+ if (newWatcher != null) {
+ watcherCount++; // watcher was successfully set
+ }
+ isDirty = false;
if (knownChildren.isEmpty()) {
return null;
}
@@ -422,16 +431,25 @@ public class DistributedQueue {
}
}
- @VisibleForTesting boolean hasWatcher() throws InterruptedException {
+ @VisibleForTesting int watcherCount() throws InterruptedException {
updateLock.lockInterruptibly();
try {
- return lastWatcher != null;
+ return watcherCount;
} finally {
updateLock.unlock();
}
}
- private class ChildWatcher implements Watcher {
+ @VisibleForTesting boolean isDirty() throws InterruptedException {
+ updateLock.lockInterruptibly();
+ try {
+ return isDirty;
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ @VisibleForTesting class ChildWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
@@ -441,10 +459,8 @@ public class DistributedQueue {
}
updateLock.lock();
try {
- // this watcher is automatically cleared when fired
- if (lastWatcher == this) {
- lastWatcher = null;
- }
+ isDirty = true;
+ watcherCount--;
// optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
changed.signalAll();
} finally {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/42d08dd2/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 88bd461..7152c56 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -120,13 +120,15 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
// After draining the queue, a watcher should be set.
assertNull(dq.peek(100));
- assertTrue(dq.hasWatcher());
+ assertFalse(dq.isDirty());
+ assertEquals(1, dq.watcherCount());
forceSessionExpire();
// Session expiry should have fired the watcher.
Thread.sleep(100);
- assertFalse(dq.hasWatcher());
+ assertTrue(dq.isDirty());
+ assertEquals(0, dq.watcherCount());
// Rerun the earlier test make sure updates are still seen, post reconnection.
future = executor.submit(new Callable<String>() {
@@ -150,6 +152,50 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
}
@Test
+ public void testLeakChildWatcher() throws Exception {
+ String dqZNode = "/distqueue/test";
+ DistributedQueue dq = makeDistributedQueue(dqZNode);
+ assertTrue(dq.peekElements(1, 1, Predicates.<String>alwaysTrue()).isEmpty());
+ assertEquals(1, dq.watcherCount());
+ assertFalse(dq.isDirty());
+ assertTrue(dq.peekElements(1, 1, Predicates.<String>alwaysTrue()).isEmpty());
+ assertEquals(1, dq.watcherCount());
+ assertFalse(dq.isDirty());
+ assertNull(dq.peek());
+ assertEquals(1, dq.watcherCount());
+ assertFalse(dq.isDirty());
+ assertNull(dq.peek(10));
+ assertEquals(1, dq.watcherCount());
+ assertFalse(dq.isDirty());
+
+ dq.offer("hello world".getBytes(UTF8));
+ assertNotNull(dq.peek()); // synchronously available
+ // dirty and watcher state indeterminate here, race with watcher
+ Thread.sleep(100); // watcher should have fired now
+ assertNotNull(dq.peek());
+ assertEquals(1, dq.watcherCount());
+ assertFalse(dq.isDirty());
+ assertFalse(dq.peekElements(1, 1, Predicates.<String>alwaysTrue()).isEmpty());
+ assertEquals(1, dq.watcherCount());
+ assertFalse(dq.isDirty());
+ }
+
+ @Test
+ public void testLocallyOffer() throws Exception {
+ String dqZNode = "/distqueue/test";
+ DistributedQueue dq = makeDistributedQueue(dqZNode);
+ dq.peekElements(1, 1, Predicates.<String>alwaysTrue());
+ for (int i = 0; i < 100; i++) {
+ byte[] data = String.valueOf(i).getBytes(UTF8);
+ dq.offer(data);
+ assertNotNull(dq.peek());
+ dq.poll();
+ dq.peekElements(1, 1, Predicates.<String>alwaysTrue());
+ }
+ }
+
+
+ @Test
public void testPeekElements() throws Exception {
String dqZNode = "/distqueue/test";
final byte[] data = "hello world".getBytes(UTF8);