You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/09 10:42:24 UTC
[10/13] lucene-solr:jira/solr-8668: SOLR-10619: Optimize using cache
for DistributedQueue in case of single-consumer
SOLR-10619: Optimize using cache for DistributedQueue in case of single-consumer
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/a24fa8d7
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/a24fa8d7
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/a24fa8d7
Branch: refs/heads/jira/solr-8668
Commit: a24fa8d7db5d880723cfc0eaa26d7ae320c4cbeb
Parents: 421611b
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue May 9 13:56:49 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue May 9 13:56:49 2017 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/DistributedQueue.java | 47 +++++++++++++-------
.../apache/solr/cloud/DistributedQueueTest.java | 36 +++++++++++++++
3 files changed, 68 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a24fa8d7/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index cdbbcc8..3d462d2 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -258,6 +258,8 @@ Optimizations
* SOLR-10524: Better ZkStateWriter batching (Cao Manh Dat, Noble Paul, shalin, Scott Blum)
+* SOLR-10619: Optimize using cache for DistributedQueue in case of single-consumer (Cao Manh Dat, Scott Blum)
+
Bug Fixes
----------------------
* SOLR-10281: ADMIN_PATHS is duplicated in two places and inconsistent. This can cause automatic
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a24fa8d7/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index 6c28cc6..64120ed 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -43,7 +43,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A distributed queue.
+ * A distributed queue. Optimized for single-consumer,
+ * multiple-producer: if there are multiple consumers on the same ZK queue,
+ * the results should be correct but inefficient
*/
public class DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -265,18 +267,16 @@ public class DistributedQueue {
* The caller must double check that the actual node still exists, since the in-memory
* list is inherently stale.
*/
- private String firstChild(boolean remove) throws KeeperException, InterruptedException {
+ private String firstChild(boolean remove, boolean refetchIfDirty) throws KeeperException, InterruptedException {
updateLock.lockInterruptibly();
try {
- if (!isDirty) {
- // If we're not in a dirty state...
- if (!knownChildren.isEmpty()) {
- // and we have in-memory children, return from in-memory.
- return remove ? knownChildren.pollFirst() : knownChildren.first();
- } else {
- // otherwise there's nothing to return
- return null;
- }
+ // We always return from cache first, the cache will be cleared if the node is not exist
+ if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
+ }
+
+ if (!isDirty && knownChildren.isEmpty()) {
+ return null;
}
// Dirty, try to fetch an updated list of children from ZK.
@@ -332,9 +332,10 @@ public class DistributedQueue {
Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
List<String> foundChildren = new ArrayList<>();
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+ boolean first = true;
while (true) {
- // Trigger a fetch if needed.
- firstChild(false);
+ // Trigger a refresh, but only force it if this is not the first iteration.
+ firstChild(false, !first);
updateLock.lockInterruptibly();
try {
@@ -349,6 +350,13 @@ public class DistributedQueue {
if (waitNanos <= 0) {
break;
}
+
+ // If this is our first time through, force a refresh before waiting.
+ if (first) {
+ first = false;
+ continue;
+ }
+
waitNanos = changed.awaitNanos(waitNanos);
} finally {
updateLock.unlock();
@@ -390,7 +398,7 @@ public class DistributedQueue {
*/
private byte[] firstElement() throws KeeperException, InterruptedException {
while (true) {
- String firstChild = firstChild(false);
+ String firstChild = firstChild(false, false);
if (firstChild == null) {
return null;
}
@@ -400,7 +408,9 @@ public class DistributedQueue {
// Another client deleted the node first, remove the in-memory and retry.
updateLock.lockInterruptibly();
try {
- knownChildren.remove(firstChild);
+ // Efficient only for single-consumer
+ knownChildren.clear();
+ isDirty = true;
} finally {
updateLock.unlock();
}
@@ -410,7 +420,7 @@ public class DistributedQueue {
private byte[] removeFirst() throws KeeperException, InterruptedException {
while (true) {
- String firstChild = firstChild(true);
+ String firstChild = firstChild(true, false);
if (firstChild == null) {
return null;
}
@@ -418,12 +428,15 @@ public class DistributedQueue {
String path = dir + "/" + firstChild;
byte[] result = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
+ stats.setQueueLength(knownChildren.size());
return result;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first, remove the in-memory and retry.
updateLock.lockInterruptibly();
try {
- knownChildren.remove(firstChild);
+ // Efficient only for single-consumer
+ knownChildren.clear();
+ isDirty = true;
} finally {
updateLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a24fa8d7/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index d2d6a16..ed33dc1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -91,6 +91,35 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
}
@Test
+ public void testDistributedQueueCache() throws Exception {
+ String dqZNode = "/distqueue/test";
+ byte[] data = "hello world".getBytes(UTF8);
+
+ DistributedQueue consumer = makeDistributedQueue(dqZNode);
+ DistributedQueue producer = makeDistributedQueue(dqZNode);
+ DistributedQueue producer2 = makeDistributedQueue(dqZNode);
+
+ producer2.offer(data);
+ producer.offer(data);
+ producer.offer(data);
+ consumer.poll();
+
+ assertEquals(2, consumer.getStats().getQueueLength());
+ producer.offer(data);
+ producer2.offer(data);
+ consumer.poll();
+ // Wait for watcher being kicked off
+ while (!consumer.isDirty()) {
+ Thread.sleep(20);
+ }
+ // DQ still have elements in their queue, so we should not fetch elements path from Zk
+ assertEquals(1, consumer.getStats().getQueueLength());
+ consumer.poll();
+ consumer.peek();
+ assertEquals(2, consumer.getStats().getQueueLength());
+ }
+
+ @Test
public void testDistributedQueueBlocking() throws Exception {
String dqZNode = "/distqueue/test";
String testData = "hello world";
@@ -161,6 +190,13 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
// dirty and watcher state indeterminate here, race with watcher
Thread.sleep(100); // watcher should have fired now
assertNotNull(dq.peek());
+ // in case of race condition, childWatcher is kicked off after peek()
+ if (dq.watcherCount() == 0) {
+ assertTrue(dq.isDirty());
+ dq.poll();
+ dq.offer("hello world".getBytes(UTF8));
+ dq.peek();
+ }
assertEquals(1, dq.watcherCount());
assertFalse(dq.isDirty());
assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());