You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by an...@apache.org on 2014/06/02 17:47:27 UTC

svn commit: r1599248 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/

Author: anshum
Date: Mon Jun  2 15:47:26 2014
New Revision: 1599248

URL: http://svn.apache.org/r1599248
Log:
SOLR-6026: Also check work-queue while processing a REQUESTSTATUS Collection API Call

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Jun  2 15:47:26 2014
@@ -179,6 +179,9 @@ Other Changes
 
 * SOLR-6116: Refactor DocRouter.getDocRouter to accept routerName as a String. (shalin)
 
+* SOLR-6026: REQUESTSTATUS Collection API now also checks for submitted tasks which are
+  yet to begin execution.
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Mon Jun  2 15:47:26 2014
@@ -22,6 +22,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -118,7 +119,39 @@ public class DistributedQueue {
     
     return orderedChildren;
   }
-  
+
+
+  /**
+   * Returns true if the queue contains a task with the specified async id.
+   */
+  public boolean containsTaskWithRequestId(String requestId)
+      throws KeeperException, InterruptedException {
+
+    List<String> childNames = null;
+    try {
+      childNames = zookeeper.getChildren(dir, null, true);
+    } catch (KeeperException.NoNodeException e) {
+      throw e;
+    }
+
+    for (String childName : childNames) {
+      if (childName != null) {
+        try {
+          ZkNodeProps message = ZkNodeProps.load(zookeeper.getData(dir + "/" + childName, null, null, true));
+          if (message.containsKey(OverseerCollectionProcessor.ASYNC)) {
+            LOG.info(">>>> {}", message.get(OverseerCollectionProcessor.ASYNC));
+            if(message.get(OverseerCollectionProcessor.ASYNC).equals(requestId)) return true;
+          }
+        } catch (KeeperException.NoNodeException e) {
+          // Another client removed the node first, try next
+        }
+      }
+    }
+
+    return false;
+  }
+
+
   /**
    * Return the head of the queue without modifying the queue.
    * 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Jun  2 15:47:26 2014
@@ -51,6 +51,7 @@ import org.apache.solr.client.solrj.Solr
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
+import org.apache.solr.cloud.DistributedQueue;
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerCollectionProcessor;
@@ -291,6 +292,11 @@ public class CollectionsHandler extends 
       } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
         SimpleOrderedMap success = new SimpleOrderedMap();
         success.add("state", "running");
+        success.add("msg", "found " + requestId + " in running tasks");
+        results.add("status", success);
+      } else if(overseerCollectionQueueContains(requestId)){
+        SimpleOrderedMap success = new SimpleOrderedMap();
+        success.add("state", "submitted");
         success.add("msg", "found " + requestId + " in submitted tasks");
         results.add("status", success);
       } else {
@@ -305,6 +311,11 @@ public class CollectionsHandler extends 
     }
   }
 
+  private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
+    DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
+    return collectionQueue.containsTaskWithRequestId(asyncId);
+  }
+
   private void handleResponse(String operation, ZkNodeProps m,
                               SolrQueryResponse rsp) throws KeeperException, InterruptedException {
     handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@@ -326,13 +337,13 @@ public class CollectionsHandler extends 
  
        if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
            coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
-           coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
+           coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
+           overseerCollectionQueueContains(asyncId)) {
          r.add("error", "Task with the same requestid already exists.");
  
        } else {
          coreContainer.getZkController().getOverseerCollectionQueue()
              .offer(ZkStateReader.toJSON(m));
- 
        }
        r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
        SolrResponse response = new OverseerSolrResponse(r);

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java Mon Jun  2 15:47:26 2014
@@ -55,7 +55,7 @@ public class AsyncMigrateRouteKeyTest ex
     params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
     // This task takes long enough to run. Also check for the current state of the task to be running.
     message = sendStatusRequestWithRetry(params, 2);
-    assertEquals("found " + asyncId + " in submitted tasks", message);
+    assertEquals("found " + asyncId + " in running tasks", message);
     // Now wait until the task actually completes successfully/fails.
     message = sendStatusRequestWithRetry(params, 20);
     assertEquals("Task " + asyncId + " not found in completed tasks.",

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java?rev=1599248&r1=1599247&r2=1599248&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java Mon Jun  2 15:47:26 2014
@@ -64,6 +64,7 @@ public class MultiThreadedOCPTest extend
 
     testParallelCollectionAPICalls();
     testTaskExclusivity();
+    testDeduplicationOfSubmittedTasks();
     testLongAndShortRunningParallelApiCalls();
   }
 
@@ -139,6 +140,25 @@ public class MultiThreadedOCPTest extend
     }
   }
 
+  private void testDeduplicationOfSubmittedTasks() throws IOException, SolrServerException {
+    SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
+    CollectionAdminRequest.createCollection("ocptest_shardsplit2", 4, "conf1", server, "3000");
+
+    CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD1, server, "3001");
+    CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD2, server, "3002");
+
+    // Now submit another task with the same id. At this time, hopefully the previous 2002 should still be in the queue.
+    CollectionAdminResponse response = CollectionAdminRequest.splitShard("ocptest_shardsplit2", SHARD1, server, "3002");
+    NamedList r = response.getResponse();
+    assertEquals("Duplicate request was supposed to exist but wasn't found. De-duplication of submitted task failed.",
+        "Task with the same requestid already exists.", r.get("error"));
+
+    for (int i=3001;i<=3002;i++) {
+      String state = getRequestStateAfterCompletion(i + "", 30, server);
+      assertTrue("Task " + i + " did not complete, final state: " + state,state.equals("completed"));
+    }
+  }
+
   private void testLongAndShortRunningParallelApiCalls() throws InterruptedException, IOException, SolrServerException {
 
     Thread indexThread = new Thread() {
@@ -158,17 +178,14 @@ public class MultiThreadedOCPTest extend
     indexThread.start();
 
     try {
-      Thread.sleep(5000);
 
       SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
       CollectionAdminRequest.splitShard("collection1", SHARD1, server, "2000");
 
       String state = getRequestState("2000", server);
-      while (!state.equals("running")) {
+      while (state.equals("submitted")) {
         state = getRequestState("2000", server);
-        if (state.equals("completed") || state.equals("failed"))
-          break;
-        Thread.sleep(100);
+        Thread.sleep(10);
       }
       assertTrue("SplitShard task [2000] was supposed to be in [running] but isn't. It is [" + state + "]", state.equals("running"));