You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2016/04/06 08:27:58 UTC
lucene-solr:master: SOLR-8948:
OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
SolrResponse node is in the overseer queue
Repository: lucene-solr
Updated Branches:
refs/heads/master a5afd1cee -> 4205b1c80
SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a SolrResponse node is in the overseer queue
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4205b1c8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4205b1c8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4205b1c8
Branch: refs/heads/master
Commit: 4205b1c8040935b4939300cf1676e0e006afec06
Parents: a5afd1c
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Wed Apr 6 11:56:27 2016 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Wed Apr 6 11:56:27 2016 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../apache/solr/cloud/OverseerTaskQueue.java | 21 ++++--
.../solr/cloud/OverseerTaskQueueTest.java | 67 ++++++++++++++++++++
3 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4205b1c8/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index e9260fb..9793893 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -104,6 +104,9 @@ Bug Fixes
* SOLR-8875: SolrCloud Overseer clusterState could unexpectedly be null resulting in NPE.
(Scott Blum via David Smiley)
+* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
+ SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
+
Optimizations
----------------------
* SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4205b1c8/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index cf9d583..4cee814 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -61,7 +61,7 @@ public class OverseerTaskQueue extends DistributedQueue {
List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
- if (childName != null) {
+ if (childName != null && childName.startsWith(PREFIX)) {
try {
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
if (data != null) {
@@ -185,17 +185,14 @@ public class OverseerTaskQueue extends DistributedQueue {
try {
// Create and watch the response node before creating the request node;
// otherwise we may miss the response.
- String watchID = createData(
- dir + "/" + response_prefix,
- null, CreateMode.EPHEMERAL_SEQUENTIAL);
+ String watchID = createResponseNode();
Object lock = new Object();
LatchWatcher watcher = new LatchWatcher(lock);
Stat stat = zookeeper.exists(watchID, watcher, true);
// create the request node
- createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
- data, CreateMode.PERSISTENT);
+ createRequestNode(data, watchID);
synchronized (lock) {
if (stat != null && watcher.getWatchedEvent() == null) {
@@ -213,6 +210,18 @@ public class OverseerTaskQueue extends DistributedQueue {
}
}
+ void createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException {
+ createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
+ data, CreateMode.PERSISTENT);
+ }
+
+ String createResponseNode() throws KeeperException, InterruptedException {
+ return createData(
+ dir + "/" + response_prefix,
+ null, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+
+
public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
throws KeeperException, InterruptedException {
ArrayList<QueueEvent> topN = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4205b1c8/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
index 028f85f..95cdd40 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
@@ -16,6 +16,19 @@
*/
package org.apache.solr.cloud;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.Utils;
+import org.junit.Test;
+
public class OverseerTaskQueueTest extends DistributedQueueTest {
@@ -25,4 +38,58 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
protected OverseerTaskQueue makeDistributedQueue(String dqZNode) throws Exception {
return new OverseerTaskQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
+
+ @Test
+ public void testContainsTaskWithRequestId() throws Exception {
+ String tqZNode = "/taskqueue/test";
+ String requestId = "foo";
+ String nonExistentRequestId = "bar";
+
+ OverseerTaskQueue tq = makeDistributedQueue(tqZNode);
+
+ // Basic ops
+ // Put an expected Overseer task onto the queue
+ final Map<String, Object> props = new HashMap<>();
+ props.put(CommonParams.NAME, "coll1");
+ props.put(OverseerCollectionMessageHandler.COLL_CONF, "myconf");
+ props.put(OverseerCollectionMessageHandler.NUM_SLICES, 1);
+ props.put(ZkStateReader.REPLICATION_FACTOR, 3);
+ props.put(CommonAdminParams.ASYNC, requestId);
+ tq.offer(Utils.toJSON(props));
+
+ assertTrue("Task queue should contain task with requestid " + requestId,
+ tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
+
+ assertFalse("Task queue should not contain task with requestid " + nonExistentRequestId,
+ tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, nonExistentRequestId));
+
+ // Create a response node as if someone is waiting for a response from the Overseer; then,
+ // create the request node.
+ // Here we're reaching a bit into the internals of OverseerTaskQueue in order to create the same
+ // response node structure but without setting a watch on it and removing it immediately when
+ // a response is set, in order to artificially create the race condition that
+ // containsTaskWithRequestId runs while the response is still in the queue.
+ String watchID = tq.createResponseNode();
+ String requestId2 = "baz";
+ props.put(CommonAdminParams.ASYNC, requestId2);
+ tq.createRequestNode(Utils.toJSON(props), watchID);
+
+ // Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
+ List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
+ OverseerTaskQueue.QueueEvent requestId2Event = null;
+ for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
+ Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());
+ if (requestId2.equals(eventProps.get(CommonAdminParams.ASYNC))) {
+ requestId2Event = queueEvent;
+ break;
+ }
+ }
+ assertNotNull("Didn't find event with requestid " + requestId2, requestId2Event);
+ requestId2Event.setBytes(SolrResponse.serializable(new SolrResponseBase()));
+ tq.remove(requestId2Event);
+
+ // Make sure this call to check if requestId exists doesn't barf with Json parse exception
+ assertTrue("Task queue should contain task with requestid " + requestId,
+ tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
+ }
}