You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by an...@apache.org on 2014/06/02 17:47:27 UTC
svn commit: r1599248 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/
Author: anshum
Date: Mon Jun 2 15:47:26 2014
New Revision: 1599248
URL: http://svn.apache.org/r1599248
Log:
SOLR-6026: Also check work-queue while processing a REQUESTSTATUS Collection API Call
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Jun 2 15:47:26 2014
@@ -179,6 +179,9 @@ Other Changes
* SOLR-6116: Refactor DocRouter.getDocRouter to accept routerName as a String. (shalin)
+* SOLR-6026: REQUESTSTATUS Collection API now also checks for submitted tasks which are
+ yet to begin execution.
+
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Mon Jun 2 15:47:26 2014
@@ -22,6 +22,7 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -118,7 +119,39 @@ public class DistributedQueue {
return orderedChildren;
}
-
+
+
+ /**
+ * Returns true if the queue contains a task with the specified async id.
+ */
+ public boolean containsTaskWithRequestId(String requestId)
+ throws KeeperException, InterruptedException {
+
+ List<String> childNames = null;
+ try {
+ childNames = zookeeper.getChildren(dir, null, true);
+ } catch (KeeperException.NoNodeException e) {
+ throw e;
+ }
+
+ for (String childName : childNames) {
+ if (childName != null) {
+ try {
+ ZkNodeProps message = ZkNodeProps.load(zookeeper.getData(dir + "/" + childName, null, null, true));
+ if (message.containsKey(OverseerCollectionProcessor.ASYNC)) {
+ LOG.info(">>>> {}", message.get(OverseerCollectionProcessor.ASYNC));
+ if(message.get(OverseerCollectionProcessor.ASYNC).equals(requestId)) return true;
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Another client removed the node first, try next
+ }
+ }
+ }
+
+ return false;
+ }
+
+
/**
* Return the head of the queue without modifying the queue.
*
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Jun 2 15:47:26 2014
@@ -51,6 +51,7 @@ import org.apache.solr.client.solrj.Solr
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
+import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
@@ -291,6 +292,11 @@ public class CollectionsHandler extends
} else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
SimpleOrderedMap success = new SimpleOrderedMap();
success.add("state", "running");
+ success.add("msg", "found " + requestId + " in running tasks");
+ results.add("status", success);
+ } else if(overseerCollectionQueueContains(requestId)){
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "submitted");
success.add("msg", "found " + requestId + " in submitted tasks");
results.add("status", success);
} else {
@@ -305,6 +311,11 @@ public class CollectionsHandler extends
}
}
+ private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
+ DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
+ return collectionQueue.containsTaskWithRequestId(asyncId);
+ }
+
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@@ -326,13 +337,13 @@ public class CollectionsHandler extends
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
- coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
+ coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
+ overseerCollectionQueueContains(asyncId)) {
r.add("error", "Task with the same requestid already exists.");
} else {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m));
-
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
SolrResponse response = new OverseerSolrResponse(r);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java Mon Jun 2 15:47:26 2014
@@ -55,7 +55,7 @@ public class AsyncMigrateRouteKeyTest ex
params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
// This task takes long enough to run. Also check for the current state of the task to be running.
message = sendStatusRequestWithRetry(params, 2);
- assertEquals("found " + asyncId + " in submitted tasks", message);
+ assertEquals("found " + asyncId + " in running tasks", message);
// Now wait until the task actually completes successfully/fails.
message = sendStatusRequestWithRetry(params, 20);
assertEquals("Task " + asyncId + " not found in completed tasks.",
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java Mon Jun 2 15:47:26 2014
@@ -64,6 +64,7 @@ public class MultiThreadedOCPTest extend
testParallelCollectionAPICalls();
testTaskExclusivity();
+ testDeduplicationOfSubmittedTasks();
testLongAndShortRunningParallelApiCalls();
}
@@ -139,6 +140,25 @@ public class MultiThreadedOCPTest extend
}
}
+ private void testDeduplicationOfSubmittedTasks() throws IOException, SolrServerException {
+ SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
+ CollectionAdminRequest.createCollection("ocptest_shardsplit2", 4, "conf1", server, "3000");
+
+ CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD1, server, "3001");
+ CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD2, server, "3002");
+
+ // Now submit another task with the same id. At this time, hopefully the previous 2002 should still be in the queue.
+ CollectionAdminResponse response = CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD1, server, "3002");
+ NamedList r = response.getResponse();
+ assertEquals("Duplicate request was supposed to exist but wasn't found. De-duplication of submitted task failed.",
+ "Task with the same requestid already exists.", r.get("error"));
+
+ for (int i=3001;i<=3002;i++) {
+ String state = getRequestStateAfterCompletion(i + "", 30, server);
+ assertTrue("Task " + i + " did not complete, final state: " + state,state.equals("completed"));
+ }
+ }
+
private void testLongAndShortRunningParallelApiCalls() throws InterruptedException, IOException, SolrServerException {
Thread indexThread = new Thread() {
@@ -158,17 +178,14 @@ public class MultiThreadedOCPTest extend
indexThread.start();
try {
- Thread.sleep(5000);
SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
CollectionAdminRequest.splitShard("collection1", SHARD1, server, "2000");
String state = getRequestState("2000", server);
- while (!state.equals("running")) {
+ while (state.equals("submitted")) {
state = getRequestState("2000", server);
- if (state.equals("completed") || state.equals("failed"))
- break;
- Thread.sleep(100);
+ Thread.sleep(10);
}
assertTrue("SplitShard task [2000] was supposed to be in [running] but isn't. It is [" + state + "]", state.equals("running"));