You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2016/04/06 08:27:58 UTC

lucene-solr:master: SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a SolrResponse node is in the overseer queue

Repository: lucene-solr
Updated Branches:
  refs/heads/master a5afd1cee -> 4205b1c80


SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a SolrResponse node is in the overseer queue


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4205b1c8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4205b1c8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4205b1c8

Branch: refs/heads/master
Commit: 4205b1c8040935b4939300cf1676e0e006afec06
Parents: a5afd1c
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Wed Apr 6 11:56:27 2016 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Wed Apr 6 11:56:27 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  3 +
 .../apache/solr/cloud/OverseerTaskQueue.java    | 21 ++++--
 .../solr/cloud/OverseerTaskQueueTest.java       | 67 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4205b1c8/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index e9260fb..9793893 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -104,6 +104,9 @@ Bug Fixes
 * SOLR-8875: SolrCloud Overseer clusterState could unexpectedly be null resulting in NPE.
   (Scott Blum via David Smiley)
 
+* SOLR-8948: OverseerTaskQueue.containsTaskWithRequestId encounters json parse error if a
+  SolrResponse node is in the overseer queue. (Jessica Cheng Mallet via shalin)
+
 Optimizations
 ----------------------
 * SOLR-8722: Don't force a full ZkStateReader refresh on every Overseer operation.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4205b1c8/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index cf9d583..4cee814 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -61,7 +61,7 @@ public class OverseerTaskQueue extends DistributedQueue {
     List<String> childNames = zookeeper.getChildren(dir, null, true);
     stats.setQueueLength(childNames.size());
     for (String childName : childNames) {
-      if (childName != null) {
+      if (childName != null && childName.startsWith(PREFIX)) {
         try {
           byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
           if (data != null) {
@@ -185,17 +185,14 @@ public class OverseerTaskQueue extends DistributedQueue {
     try {
       // Create and watch the response node before creating the request node;
       // otherwise we may miss the response.
-      String watchID = createData(
-          dir + "/" + response_prefix,
-          null, CreateMode.EPHEMERAL_SEQUENTIAL);
+      String watchID = createResponseNode();
 
       Object lock = new Object();
       LatchWatcher watcher = new LatchWatcher(lock);
       Stat stat = zookeeper.exists(watchID, watcher, true);
 
       // create the request node
-      createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
-          data, CreateMode.PERSISTENT);
+      createRequestNode(data, watchID);
 
       synchronized (lock) {
         if (stat != null && watcher.getWatchedEvent() == null) {
@@ -213,6 +210,18 @@ public class OverseerTaskQueue extends DistributedQueue {
     }
   }
 
+  void createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException {
+    createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
+        data, CreateMode.PERSISTENT);
+  }
+
+  String createResponseNode() throws KeeperException, InterruptedException {
+    return createData(
+            dir + "/" + response_prefix,
+            null, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
+
   public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
       throws KeeperException, InterruptedException {
     ArrayList<QueueEvent> topN = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4205b1c8/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
index 028f85f..95cdd40 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
@@ -16,6 +16,19 @@
  */
 package org.apache.solr.cloud;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.Utils;
+import org.junit.Test;
+
 public class OverseerTaskQueueTest extends DistributedQueueTest {
 
 
@@ -25,4 +38,58 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
   protected OverseerTaskQueue makeDistributedQueue(String dqZNode) throws Exception {
     return new OverseerTaskQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
   }
+
+  @Test
+  public void testContainsTaskWithRequestId() throws Exception {
+    String tqZNode = "/taskqueue/test";
+    String requestId = "foo";
+    String nonExistentRequestId = "bar";
+
+    OverseerTaskQueue tq = makeDistributedQueue(tqZNode);
+
+    // Basic ops
+    // Put an expected Overseer task onto the queue
+    final Map<String, Object> props = new HashMap<>();
+    props.put(CommonParams.NAME, "coll1");
+    props.put(OverseerCollectionMessageHandler.COLL_CONF, "myconf");
+    props.put(OverseerCollectionMessageHandler.NUM_SLICES, 1);
+    props.put(ZkStateReader.REPLICATION_FACTOR, 3);
+    props.put(CommonAdminParams.ASYNC, requestId);
+    tq.offer(Utils.toJSON(props));
+
+    assertTrue("Task queue should contain task with requestid " + requestId,
+        tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
+
+    assertFalse("Task queue should not contain task with requestid " + nonExistentRequestId,
+        tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, nonExistentRequestId));
+
+    // Create a response node as if someone is waiting for a response from the Overseer; then,
+    // create the request node.
+    // Here we're reaching a bit into the internals of OverseerTaskQueue in order to create the same
+    // response node structure but without setting a watch on it and removing it immediately when
+    // a response is set, in order to artificially create the race condition that
+    // containsTaskWithRequestId runs while the response is still in the queue.
+    String watchID = tq.createResponseNode();
+    String requestId2 = "baz";
+    props.put(CommonAdminParams.ASYNC, requestId2);
+    tq.createRequestNode(Utils.toJSON(props), watchID);
+
+    // Set a SolrResponse as the response node by removing the QueueEvent, as done in OverseerTaskProcessor
+    List<OverseerTaskQueue.QueueEvent> queueEvents = tq.peekTopN(2, Collections.emptySet(), 1000);
+    OverseerTaskQueue.QueueEvent requestId2Event = null;
+    for (OverseerTaskQueue.QueueEvent queueEvent : queueEvents) {
+      Map<String, Object> eventProps = (Map<String, Object>) Utils.fromJSON(queueEvent.getBytes());
+      if (requestId2.equals(eventProps.get(CommonAdminParams.ASYNC))) {
+        requestId2Event = queueEvent;
+        break;
+      }
+    }
+    assertNotNull("Didn't find event with requestid " + requestId2, requestId2Event);
+    requestId2Event.setBytes(SolrResponse.serializable(new SolrResponseBase()));
+    tq.remove(requestId2Event);
+
+    // Make sure this call to check if requestId exists doesn't barf with Json parse exception
+    assertTrue("Task queue should contain task with requestid " + requestId,
+        tq.containsTaskWithRequestId(CommonAdminParams.ASYNC, requestId));
+  }
 }