You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2016/06/07 22:35:33 UTC

lucene-solr:SOLR-9191: SOLR-9191: Add test for DistributedQueue.peekElements()

Repository: lucene-solr
Updated Branches:
  refs/heads/SOLR-9191 23c0edb15 -> 024a733b7


SOLR-9191: Add test for DistributedQueue.peekElements()


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/024a733b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/024a733b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/024a733b

Branch: refs/heads/SOLR-9191
Commit: 024a733b7f561cfbc1b3e557fd0ab9d69aa6a9c9
Parents: 23c0edb
Author: Scott Blum <dr...@apache.org>
Authored: Tue Jun 7 18:34:48 2016 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Tue Jun 7 18:34:48 2016 -0400

----------------------------------------------------------------------
 .../org/apache/solr/cloud/DistributedQueue.java | 11 +++--
 .../apache/solr/cloud/DistributedQueueTest.java | 44 +++++++++++++++++++-
 2 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/024a733b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 5d4fd87..afed6f1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -323,7 +323,7 @@ public class DistributedQueue {
   Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Function<String, Boolean> acceptFilter) throws KeeperException, InterruptedException {
     List<String> foundChildren = new ArrayList<>();
     long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
-    while (waitNanos > 0) {
+    while (true) {
       // Trigger a fetch if needed.
       firstChild(false);
 
@@ -337,6 +337,9 @@ public class DistributedQueue {
         if (!foundChildren.isEmpty()) {
           break;
         }
+        if (waitNanos <= 0) {
+          break;
+        }
         waitNanos = changed.awaitNanos(waitNanos);
       } finally {
         updateLock.unlock();
@@ -352,12 +355,12 @@ public class DistributedQueue {
     // much more sophisticated waitNanos tracking.
     List<Pair<String, byte[]>> result = new ArrayList<>();
     for (String child : foundChildren) {
+      if (result.size() >= max) {
+        break;
+      }
       try {
         byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
         result.add(new Pair<>(child, data));
-        if (result.size() >= max) {
-          break;
-        }
       } catch (KeeperException.NoNodeException e) {
         // Another client deleted the node first, remove the in-memory and continue.
         updateLock.lockInterruptibly();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/024a733b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 840e7e5..f42f101 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
 import java.nio.charset.Charset;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -137,6 +136,49 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     assertNull(dq.poll());
   }
 
+  @Test
+  public void testPeekElements() throws Exception {
+    String dqZNode = "/distqueue/test";
+    byte[] data = "hello world".getBytes(UTF8);
+
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
+
+    // Populate with data.
+    dq.offer(data);
+    dq.offer(data);
+    dq.offer(data);
+
+    // Should be able to get 0, 1, 2, or 3 instantly
+    for (int i = 0; i <= 3; ++i) {
+      assertEquals(i, dq.peekElements(i, 0, child -> true).size());
+    }
+
+    // Asking for more should return only 3.
+    assertEquals(3, dq.peekElements(4, 0, child -> true).size());
+
+    // If we filter everything out, we should block for the full time.
+    long start = System.nanoTime();
+    assertEquals(0, dq.peekElements(4, 1000, child -> false).size());
+    assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(500));
+
+    // If someone adds a new matching element while we're waiting, we should return immediately.
+    executor.submit(() -> {
+      try {
+        Thread.sleep(500);
+        dq.offer(data);
+      } catch (Exception e) {
+        // ignore
+      }
+    });
+    start = System.nanoTime();
+    assertEquals(1, dq.peekElements(4, 2000, child -> {
+      // The 4th element in the queue will end with a "3".
+      return child.endsWith("3");
+    }).size());
+    assertTrue(System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(1000));
+    assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(250));
+  }
+
   private void forceSessionExpire() throws InterruptedException, TimeoutException {
     long sessionId = zkClient.getSolrZooKeeper().getSessionId();
     zkServer.expire(sessionId);