You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2016/06/07 22:35:33 UTC
lucene-solr:SOLR-9191: SOLR-9191: Add test for
DistributedQueue.peekElements()
Repository: lucene-solr
Updated Branches:
refs/heads/SOLR-9191 23c0edb15 -> 024a733b7
SOLR-9191: Add test for DistributedQueue.peekElements()
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/024a733b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/024a733b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/024a733b
Branch: refs/heads/SOLR-9191
Commit: 024a733b7f561cfbc1b3e557fd0ab9d69aa6a9c9
Parents: 23c0edb
Author: Scott Blum <dr...@apache.org>
Authored: Tue Jun 7 18:34:48 2016 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Tue Jun 7 18:34:48 2016 -0400
----------------------------------------------------------------------
.../org/apache/solr/cloud/DistributedQueue.java | 11 +++--
.../apache/solr/cloud/DistributedQueueTest.java | 44 +++++++++++++++++++-
2 files changed, 50 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/024a733b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 5d4fd87..afed6f1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -323,7 +323,7 @@ public class DistributedQueue {
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Function<String, Boolean> acceptFilter) throws KeeperException, InterruptedException {
List<String> foundChildren = new ArrayList<>();
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
- while (waitNanos > 0) {
+ while (true) {
// Trigger a fetch if needed.
firstChild(false);
@@ -337,6 +337,9 @@ public class DistributedQueue {
if (!foundChildren.isEmpty()) {
break;
}
+ if (waitNanos <= 0) {
+ break;
+ }
waitNanos = changed.awaitNanos(waitNanos);
} finally {
updateLock.unlock();
@@ -352,12 +355,12 @@ public class DistributedQueue {
// much more sophisticated waitNanos tracking.
List<Pair<String, byte[]>> result = new ArrayList<>();
for (String child : foundChildren) {
+ if (result.size() >= max) {
+ break;
+ }
try {
byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
result.add(new Pair<>(child, data));
- if (result.size() >= max) {
- break;
- }
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first, remove the in-memory and continue.
updateLock.lockInterruptibly();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/024a733b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 840e7e5..f42f101 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
import java.nio.charset.Charset;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -137,6 +136,49 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertNull(dq.poll());
}
+ @Test
+ public void testPeekElements() throws Exception {
+ String dqZNode = "/distqueue/test";
+ byte[] data = "hello world".getBytes(UTF8);
+
+ DistributedQueue dq = makeDistributedQueue(dqZNode);
+
+ // Populate with data.
+ dq.offer(data);
+ dq.offer(data);
+ dq.offer(data);
+
+ // Should be able to get 0, 1, 2, or 3 instantly
+ for (int i = 0; i <= 3; ++i) {
+ assertEquals(i, dq.peekElements(i, 0, child -> true).size());
+ }
+
+ // Asking for more should return only 3.
+ assertEquals(3, dq.peekElements(4, 0, child -> true).size());
+
+ // If we filter everything out, we should block for the full time.
+ long start = System.nanoTime();
+ assertEquals(0, dq.peekElements(4, 1000, child -> false).size());
+ assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(500));
+
+ // If someone adds a new matching element while we're waiting, we should return immediately.
+ executor.submit(() -> {
+ try {
+ Thread.sleep(500);
+ dq.offer(data);
+ } catch (Exception e) {
+ // ignore
+ }
+ });
+ start = System.nanoTime();
+ assertEquals(1, dq.peekElements(4, 2000, child -> {
+ // The 4th element in the queue will end with a "3".
+ return child.endsWith("3");
+ }).size());
+ assertTrue(System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(1000));
+ assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(250));
+ }
+
private void forceSessionExpire() throws InterruptedException, TimeoutException {
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
zkServer.expire(sessionId);