You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/09 10:42:24 UTC

[10/13] lucene-solr:jira/solr-8668: SOLR-10619: Optimize using cache for DistributedQueue in case of single-consumer

SOLR-10619: Optimize using cache for DistributedQueue in case of single-consumer


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/a24fa8d7
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/a24fa8d7
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/a24fa8d7

Branch: refs/heads/jira/solr-8668
Commit: a24fa8d7db5d880723cfc0eaa26d7ae320c4cbeb
Parents: 421611b
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue May 9 13:56:49 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue May 9 13:56:49 2017 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../org/apache/solr/cloud/DistributedQueue.java | 47 +++++++++++++-------
 .../apache/solr/cloud/DistributedQueueTest.java | 36 +++++++++++++++
 3 files changed, 68 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a24fa8d7/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index cdbbcc8..3d462d2 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -258,6 +258,8 @@ Optimizations
 
 * SOLR-10524: Better ZkStateWriter batching (Cao Manh Dat, Noble Paul, shalin, Scott Blum)
 
+* SOLR-10619: Optimize using cache for DistributedQueue in case of single-consumer (Cao Manh Dat, Scott Blum)
+
 Bug Fixes
 ----------------------
 * SOLR-10281: ADMIN_PATHS is duplicated in two places and inconsistent. This can cause automatic

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a24fa8d7/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 6c28cc6..64120ed 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -43,7 +43,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A distributed queue.
+ * A distributed queue. Optimized for single-consumer,
+ * multiple-producer: if there are multiple consumers on the same ZK queue,
+ * the results should be correct but inefficient
  */
 public class DistributedQueue {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -265,18 +267,16 @@ public class DistributedQueue {
    * The caller must double check that the actual node still exists, since the in-memory
    * list is inherently stale.
    */
-  private String firstChild(boolean remove) throws KeeperException, InterruptedException {
+  private String firstChild(boolean remove, boolean refetchIfDirty) throws KeeperException, InterruptedException {
     updateLock.lockInterruptibly();
     try {
-      if (!isDirty) {
-        // If we're not in a dirty state...
-        if (!knownChildren.isEmpty()) {
-          // and we have in-memory children, return from in-memory.
-          return remove ? knownChildren.pollFirst() : knownChildren.first();
-        } else {
-          // otherwise there's nothing to return
-          return null;
-        }
+      // We always return from cache first, the cache will be cleared if the node is not exist
+      if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
+        return remove ? knownChildren.pollFirst() : knownChildren.first();
+      }
+
+      if (!isDirty && knownChildren.isEmpty()) {
+        return null;
       }
 
       // Dirty, try to fetch an updated list of children from ZK.
@@ -332,9 +332,10 @@ public class DistributedQueue {
   Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
     List<String> foundChildren = new ArrayList<>();
     long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+    boolean first = true;
     while (true) {
-      // Trigger a fetch if needed.
-      firstChild(false);
+      // Trigger a refresh, but only force it if this is not the first iteration.
+      firstChild(false, !first);
 
       updateLock.lockInterruptibly();
       try {
@@ -349,6 +350,13 @@ public class DistributedQueue {
         if (waitNanos <= 0) {
           break;
         }
+
+        // If this is our first time through, force a refresh before waiting.
+        if (first) {
+          first = false;
+          continue;
+        }
+
         waitNanos = changed.awaitNanos(waitNanos);
       } finally {
         updateLock.unlock();
@@ -390,7 +398,7 @@ public class DistributedQueue {
    */
   private byte[] firstElement() throws KeeperException, InterruptedException {
     while (true) {
-      String firstChild = firstChild(false);
+      String firstChild = firstChild(false, false);
       if (firstChild == null) {
         return null;
       }
@@ -400,7 +408,9 @@ public class DistributedQueue {
         // Another client deleted the node first, remove the in-memory and retry.
         updateLock.lockInterruptibly();
         try {
-          knownChildren.remove(firstChild);
+          // Efficient only for single-consumer
+          knownChildren.clear();
+          isDirty = true;
         } finally {
           updateLock.unlock();
         }
@@ -410,7 +420,7 @@ public class DistributedQueue {
 
   private byte[] removeFirst() throws KeeperException, InterruptedException {
     while (true) {
-      String firstChild = firstChild(true);
+      String firstChild = firstChild(true, false);
       if (firstChild == null) {
         return null;
       }
@@ -418,12 +428,15 @@ public class DistributedQueue {
         String path = dir + "/" + firstChild;
         byte[] result = zookeeper.getData(path, null, null, true);
         zookeeper.delete(path, -1, true);
+        stats.setQueueLength(knownChildren.size());
         return result;
       } catch (KeeperException.NoNodeException e) {
         // Another client deleted the node first, remove the in-memory and retry.
         updateLock.lockInterruptibly();
         try {
-          knownChildren.remove(firstChild);
+          // Efficient only for single-consumer
+          knownChildren.clear();
+          isDirty = true;
         } finally {
           updateLock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a24fa8d7/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index d2d6a16..ed33dc1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -91,6 +91,35 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
   }
 
   @Test
+  public void testDistributedQueueCache() throws Exception {
+    String dqZNode = "/distqueue/test";
+    byte[] data = "hello world".getBytes(UTF8);
+
+    DistributedQueue consumer = makeDistributedQueue(dqZNode);
+    DistributedQueue producer = makeDistributedQueue(dqZNode);
+    DistributedQueue producer2 = makeDistributedQueue(dqZNode);
+
+    producer2.offer(data);
+    producer.offer(data);
+    producer.offer(data);
+    consumer.poll();
+
+    assertEquals(2, consumer.getStats().getQueueLength());
+    producer.offer(data);
+    producer2.offer(data);
+    consumer.poll();
+    // Wait for watcher being kicked off
+    while (!consumer.isDirty()) {
+      Thread.sleep(20);
+    }
+    // DQ still have elements in their queue, so we should not fetch elements path from Zk
+    assertEquals(1, consumer.getStats().getQueueLength());
+    consumer.poll();
+    consumer.peek();
+    assertEquals(2, consumer.getStats().getQueueLength());
+  }
+
+  @Test
   public void testDistributedQueueBlocking() throws Exception {
     String dqZNode = "/distqueue/test";
     String testData = "hello world";
@@ -161,6 +190,13 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     // dirty and watcher state indeterminate here, race with watcher
     Thread.sleep(100); // watcher should have fired now
     assertNotNull(dq.peek());
+    // in case of race condition, childWatcher is kicked off after peek()
+    if (dq.watcherCount() == 0) {
+      assertTrue(dq.isDirty());
+      dq.poll();
+      dq.offer("hello world".getBytes(UTF8));
+      dq.peek();
+    }
     assertEquals(1, dq.watcherCount());
     assertFalse(dq.isDirty());
     assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());