You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2017/04/18 20:43:53 UTC

lucene-solr:branch_5_5: SOLR-10420: fix watcher leak in DistributedQueue

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_5_5 291b6c883 -> 89beee8d6


SOLR-10420: fix watcher leak in DistributedQueue


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/89beee8d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/89beee8d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/89beee8d

Branch: refs/heads/branch_5_5
Commit: 89beee8d61346d50dbbf02f0cc9cfc5032e46eee
Parents: 291b6c8
Author: Scott Blum <dr...@gmail.com>
Authored: Mon Apr 17 18:27:12 2017 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Tue Apr 18 19:43:36 2017 +0000

----------------------------------------------------------------------
 solr/CHANGES.txt                                | 18 +++++++
 .../org/apache/solr/cloud/DistributedQueue.java | 56 +++++++++++++-------
 .../apache/solr/cloud/DistributedQueueTest.java | 50 ++++++++++++++++-
 3 files changed, 102 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/89beee8d/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f1ee0ca..07c9f3e 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -8,6 +8,24 @@ caching, replication, and a web administration interface.
 
 See http://lucene.apache.org/solr for more information.
 
+======================= 5.5.5 =======================
+
+Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
+
+Versions of Major Components
+---------------------
+Apache Tika 1.7
+Carrot2 3.10.4
+Velocity 1.7 and Velocity Tools 2.0
+Apache UIMA 2.3.1
+Apache ZooKeeper 3.4.6
+Jetty 9.2.13.v20150730
+
+Bug Fixes
+----------------------
+
+* SOLR-10420: Solr 6.x leaking one SolrZkClient instance per second (Scott Blum, Cao Manh Dat, Markus Jelsma, Steve Rowe)
+
 ======================= 5.5.4 =======================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/89beee8d/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 7576ae5..dfc8784 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -86,10 +86,9 @@ public class DistributedQueue {
    */
   private final Condition changed = updateLock.newCondition();
 
-  /**
-   * If non-null, the last watcher to listen for child changes.  If null, the in-memory contents are dirty.
-   */
-  private ChildWatcher lastWatcher = null;
+  private boolean isDirty = true;
+
+  private int watcherCount = 0;
 
   public DistributedQueue(SolrZkClient zookeeper, String dir) {
     this(zookeeper, dir, new Overseer.Stats());
@@ -238,10 +237,10 @@ public class DistributedQueue {
     try {
       while (true) {
         try {
-          // We don't need to explicitly set isDirty here; if there is a watcher, it will
-          // see the update and set the bit itself; if there is no watcher we can defer
-          // the update anyway.
+          // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
+          // This will get set again when the watcher actually fires, but that's ok.
           zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+          isDirty = true;
           return;
         } catch (KeeperException.NoNodeException e) {
           try {
@@ -269,15 +268,25 @@ public class DistributedQueue {
   private String firstChild(boolean remove) throws KeeperException, InterruptedException {
     updateLock.lockInterruptibly();
     try {
-      // If we're not in a dirty state, and we have in-memory children, return from in-memory.
-      if (lastWatcher != null && !knownChildren.isEmpty()) {
-        return remove ? knownChildren.pollFirst() : knownChildren.first();
+      if (!isDirty) {
+        // If we're not in a dirty state...
+        if (!knownChildren.isEmpty()) {
+          // and we have in-memory children, return from in-memory.
+          return remove ? knownChildren.pollFirst() : knownChildren.first();
+        } else {
+          // otherwise there's nothing to return
+          return null;
+        }
       }
 
-      // Try to fetch an updated list of children from ZK.
-      ChildWatcher newWatcher = new ChildWatcher();
+      // Dirty, try to fetch an updated list of children from ZK.
+      // Only set a new watcher if there isn't already a watcher.
+      ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
       knownChildren = fetchZkChildren(newWatcher);
-      lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
+      if (newWatcher != null) {
+        watcherCount++; // watcher was successfully set
+      }
+      isDirty = false;
       if (knownChildren.isEmpty()) {
         return null;
       }
@@ -422,16 +431,25 @@ public class DistributedQueue {
     }
   }
 
-  @VisibleForTesting boolean hasWatcher() throws InterruptedException {
+  @VisibleForTesting int watcherCount() throws InterruptedException {
     updateLock.lockInterruptibly();
     try {
-      return lastWatcher != null;
+      return watcherCount;
     } finally {
       updateLock.unlock();
     }
   }
 
-  private class ChildWatcher implements Watcher {
+  @VisibleForTesting boolean isDirty() throws InterruptedException {
+    updateLock.lockInterruptibly();
+    try {
+      return isDirty;
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
+  @VisibleForTesting class ChildWatcher implements Watcher {
 
     @Override
     public void process(WatchedEvent event) {
@@ -441,10 +459,8 @@ public class DistributedQueue {
       }
       updateLock.lock();
       try {
-        // this watcher is automatically cleared when fired
-        if (lastWatcher == this) {
-          lastWatcher = null;
-        }
+        isDirty = true;
+        watcherCount--;
         // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
         changed.signalAll();
       } finally {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/89beee8d/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 88bd461..7152c56 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -120,13 +120,15 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
 
     // After draining the queue, a watcher should be set.
     assertNull(dq.peek(100));
-    assertTrue(dq.hasWatcher());
+    assertFalse(dq.isDirty());
+    assertEquals(1, dq.watcherCount());
 
     forceSessionExpire();
 
     // Session expiry should have fired the watcher.
     Thread.sleep(100);
-    assertFalse(dq.hasWatcher());
+    assertTrue(dq.isDirty());
+    assertEquals(0, dq.watcherCount());
 
     // Rerun the earlier test make sure updates are still seen, post reconnection.
     future = executor.submit(new Callable<String>() {
@@ -150,6 +152,50 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
   }
 
   @Test
+  public void testLeakChildWatcher() throws Exception {
+    String dqZNode = "/distqueue/test";
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
+    assertTrue(dq.peekElements(1, 1, Predicates.<String>alwaysTrue()).isEmpty());
+    assertEquals(1, dq.watcherCount());
+    assertFalse(dq.isDirty());
+    assertTrue(dq.peekElements(1, 1, Predicates.<String>alwaysTrue()).isEmpty());
+    assertEquals(1, dq.watcherCount());
+    assertFalse(dq.isDirty());
+    assertNull(dq.peek());
+    assertEquals(1, dq.watcherCount());
+    assertFalse(dq.isDirty());
+    assertNull(dq.peek(10));
+    assertEquals(1, dq.watcherCount());
+    assertFalse(dq.isDirty());
+
+    dq.offer("hello world".getBytes(UTF8));
+    assertNotNull(dq.peek()); // synchronously available
+    // dirty and watcher state indeterminate here, race with watcher
+    Thread.sleep(100); // watcher should have fired now
+    assertNotNull(dq.peek());
+    assertEquals(1, dq.watcherCount());
+    assertFalse(dq.isDirty());
+    assertFalse(dq.peekElements(1, 1, Predicates.<String>alwaysTrue()).isEmpty());
+    assertEquals(1, dq.watcherCount());
+    assertFalse(dq.isDirty());
+  }
+
+  @Test
+  public void testLocallyOffer() throws Exception {
+    String dqZNode = "/distqueue/test";
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
+    dq.peekElements(1, 1, Predicates.<String>alwaysTrue());
+    for (int i = 0; i < 100; i++) {
+      byte[] data = String.valueOf(i).getBytes(UTF8);
+      dq.offer(data);
+      assertNotNull(dq.peek());
+      dq.poll();
+      dq.peekElements(1, 1, Predicates.<String>alwaysTrue());
+    }
+  }
+
+
+  @Test
   public void testPeekElements() throws Exception {
     String dqZNode = "/distqueue/test";
     final byte[] data = "hello world".getBytes(UTF8);