You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by an...@apache.org on 2014/03/17 22:29:03 UTC

svn commit: r1578598 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/handler/admin/ solr/core/src/test/org/apache/solr/cloud/...

Author: anshum
Date: Mon Mar 17 21:29:03 2014
New Revision: 1578598

URL: http://svn.apache.org/r1578598
Log:
SOLR-5477: Async execution of OverseerCollectionProcessor tasks (merged trunk r1577444, r1577801, r1577965)

Added:
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
      - copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
      - copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
      - copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
      - copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminRequestStatusTest.java
      - copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminRequestStatusTest.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Mon Mar 17 21:29:03 2014
@@ -70,12 +70,14 @@ New Features
 * SOLR-5720: Add ExpandComponent to expand results collapsed by the 
   CollapsingQParserPlugin. (Joel Bernstein)
 
-
 * SOLR-3177: Enable tagging and excluding filters in StatsComponent via the
   localParams syntax. (Mathias H., Nikolai Luthman, Vitaliy Zhovtyuk, shalin)
   
 * SOLR-1604: Wildcards, ORs etc inside Phrase Queries. (Ahmet Arslan via Erick Erickson)
 
+* SOLR-5477: Async execution of OverseerCollectionProcessor(CollectionsAPI)
+  tasks. (Anshum Gupta)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Mar 17 21:29:03 2014
@@ -81,6 +81,13 @@ public class Overseer {
     //Internal queue where overseer stores events that have not yet been published into cloudstate
     //If Overseer dies while extracting the main queue a new overseer will start from this queue 
     private final DistributedQueue workQueue;
+    // Internal map which holds the information about running tasks.
+    private final DistributedMap runningMap;
+    // Internal map which holds the information about successfully completed tasks.
+    private final DistributedMap completedMap;
+    // Internal map which holds the information about failed tasks.
+    private final DistributedMap failureMap;
+
     private Map clusterProps;
     private boolean isClosed = false;
 
@@ -88,6 +95,9 @@ public class Overseer {
       this.zkClient = reader.getZkClient();
       this.stateUpdateQueue = getInQueue(zkClient);
       this.workQueue = getInternalQueue(zkClient);
+      this.failureMap = getFailureMap(zkClient);
+      this.runningMap = getRunningMap(zkClient);
+      this.completedMap = getCompletedMap(zkClient);
       this.myId = myId;
       this.reader = reader;
       clusterProps = reader.getClusterProps();
@@ -135,7 +145,7 @@ public class Overseer {
                   }
                   zkClient.setData(ZkStateReader.CLUSTER_STATE,
                       ZkStateReader.toJSON(clusterState), true);
-                  
+
                   workQueue.poll(); // poll-ing removes the element we got by peek-ing
                 }
                 else {
@@ -1135,6 +1145,24 @@ public class Overseer {
     createOverseerNode(zkClient);
     return new DistributedQueue(zkClient, "/overseer/queue-work", null);
   }
+
+  /* Internal map for failed tasks, not to be used outside of the Overseer */
+  static DistributedMap getRunningMap(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedMap(zkClient, "/overseer/collection-map-running", null);
+  }
+
+  /* Internal map for successfully completed tasks, not to be used outside of the Overseer */
+  static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedMap(zkClient, "/overseer/collection-map-completed", null);
+  }
+
+  /* Internal map for failed tasks, not to be used outside of the Overseer */
+  static DistributedMap getFailureMap(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedMap(zkClient, "/overseer/collection-map-failure", null);
+  }
   
   /* Collection creation queue */
   static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Mar 17 21:29:03 2014
@@ -61,10 +61,8 @@ import org.apache.solr.handler.component
 import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.update.SolrIndexSplitter;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +80,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.solr.cloud.Assign.Node;
@@ -118,16 +117,22 @@ public class OverseerCollectionProcessor
 
   public static final String DELETESHARD = "deleteshard";
 
+  public static final String REQUESTSTATUS = "status";
+
   public static final String ROUTER = "router";
 
   public static final String SHARDS_PROP = "shards";
 
+  public static final String ASYNC = "async";
+
   public static final String CREATESHARD = "createshard";
 
   public static final String DELETEREPLICA = "deletereplica";
 
   public static final String MIGRATE = "migrate";
 
+  public static final String REQUESTID = "requestid";
+
   public static final String COLL_CONF = "collection.configName";
 
   public static final String COLL_PROP_PREFIX = "property.";
@@ -149,6 +154,9 @@ public class OverseerCollectionProcessor
       .getLogger(OverseerCollectionProcessor.class);
   
   private DistributedQueue workQueue;
+  private DistributedMap runningMap;
+  private DistributedMap completedMap;
+  private DistributedMap failureMap;
   
   private String myId;
 
@@ -161,15 +169,25 @@ public class OverseerCollectionProcessor
   private boolean isClosed;
   
   public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
-    this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()));
+    this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
+        Overseer.getRunningMap(zkStateReader.getZkClient()),
+        Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
   }
 
-  protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, DistributedQueue workQueue) {
+  protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
+                                        String adminPath,
+                                        DistributedQueue workQueue,
+                                        DistributedMap runningMap,
+                                        DistributedMap completedMap,
+                                        DistributedMap failureMap) {
     this.zkStateReader = zkStateReader;
     this.myId = myId;
     this.shardHandler = shardHandler;
     this.adminPath = adminPath;
     this.workQueue = workQueue;
+    this.runningMap = runningMap;
+    this.completedMap = completedMap;
+    this.failureMap = failureMap;
   }
   
   @Override
@@ -200,11 +218,35 @@ public class OverseerCollectionProcessor
            
            QueueEvent head = workQueue.peek(true);
            final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+
+           final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
+
+           try {
+             if(message.containsKey(ASYNC) && message.get(ASYNC) != null && !runningMap.contains(message.getStr(ASYNC)))
+              runningMap.put(asyncId, null);
+           } catch (KeeperException.NodeExistsException e) {
+             // Just catch and do nothing. The runningMap.contains(..) check ensures that this is the only
+             // entry point into the runningMap.
+             // NOTE: Make sure to handle it as soon as OCP gets distributed/multi-threaded.
+           }
+
            log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
            final String operation = message.getStr(QUEUE_OPERATION);
            SolrResponse response = processMessage(message, operation);
+
            head.setBytes(SolrResponse.serializable(response));
+           if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
+             if(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
+               failureMap.put(asyncId, null);
+             } else {
+               completedMap.put(asyncId, null);
+             }
+           }
+           if(asyncId != null)
+            runningMap.remove(asyncId);
+
            workQueue.remove(head);
+
           log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
         } catch (KeeperException e) {
           if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -412,9 +454,9 @@ public class OverseerCollectionProcessor
         processRoleCommand(message, operation);
       } else if (ADDREPLICA.isEqual(operation))  {
         addReplica(zkStateReader.getClusterState(), message, results);
-      }
-
-      else {
+      } else if (REQUESTSTATUS.equals(operation)) {
+        requestStatus(message, results);
+      } else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
       }
@@ -764,13 +806,7 @@ public class OverseerCollectionProcessor
 
     }
 
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        processResponse(results, srsp);
-      }
-    } while (srsp != null);
+    processResponses(results);
 
     log.info("Finished create command on all shards for collection: "
         + collectionName);
@@ -931,6 +967,9 @@ public class OverseerCollectionProcessor
       // the only side effect of this is that the sub shard may end up having more replicas than we want
       collectShardResponses(results, false, null);
 
+      String asyncId = message.getStr(ASYNC);
+      HashMap<String, String> requestMap = new HashMap<String, String>();
+
       for (int i=0; i<subRanges.size(); i++)  {
         String subSlice = subSlices.get(i);
         String subShardName = subShardNames.get(i);
@@ -957,12 +996,15 @@ public class OverseerCollectionProcessor
         params.set(CoreAdminParams.NAME, subShardName);
         params.set(CoreAdminParams.COLLECTION, collectionName);
         params.set(CoreAdminParams.SHARD, subSlice);
+        setupAsyncRequest(asyncId, requestMap, params, nodeName);
         addPropertyParams(message, params);
         sendShardRequest(nodeName, params);
       }
 
       collectShardResponses(results, true,
-          "SPLTSHARD failed to create subshard leaders");
+          "SPLITSHARD failed to create subshard leaders");
+
+      completeAsyncRequest(asyncId, requestMap, results);
 
       for (String subShardName : subShardNames) {
         // wait for parent leader to acknowledge the sub-shard core
@@ -975,12 +1017,18 @@ public class OverseerCollectionProcessor
         cmd.setState(ZkStateReader.ACTIVE);
         cmd.setCheckLive(true);
         cmd.setOnlyIfLeader(true);
-        sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+
+        ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+        setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+        sendShardRequest(nodeName, p);
       }
 
       collectShardResponses(results, true,
-          "SPLTSHARD timed out waiting for subshard leaders to come up");
-      
+          "SPLITSHARD timed out waiting for subshard leaders to come up");
+
+      completeAsyncRequest(asyncId, requestMap, results);
+
       log.info("Successfully created all sub-shards for collection "
           + collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
 
@@ -996,9 +1044,12 @@ public class OverseerCollectionProcessor
         params.add(CoreAdminParams.TARGET_CORE, subShardName);
       }
       params.set(CoreAdminParams.RANGES, rangesStr);
+      setupAsyncRequest(asyncId, requestMap, params, parentShardLeader.getNodeName());
 
       sendShardRequest(parentShardLeader.getNodeName(), params);
+
       collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
+      completeAsyncRequest(asyncId, requestMap, results);
 
       log.info("Index on shard: " + nodeName + " split into two successfully");
 
@@ -1012,12 +1063,16 @@ public class OverseerCollectionProcessor
         params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
         params.set(CoreAdminParams.NAME, subShardName);
 
+        setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
         sendShardRequest(nodeName, params);
       }
 
       collectShardResponses(results, true,
           "SPLITSHARD failed while asking sub shard leaders to apply buffered updates");
 
+      completeAsyncRequest(asyncId, requestMap, results);
+
       log.info("Successfully applied buffered updates on : " + subShardNames);
 
       // Replica creation for the new Slices
@@ -1067,6 +1122,12 @@ public class OverseerCollectionProcessor
           params.set(CoreAdminParams.NAME, shardName);
           params.set(CoreAdminParams.COLLECTION, collectionName);
           params.set(CoreAdminParams.SHARD, sliceName);
+          if(asyncId != null) {
+            String requestId = asyncId + Math.abs(System.nanoTime());
+            params.set(ASYNC, requestId);
+            requestMap.put(subShardNodeName, requestId);
+          }
+
           addPropertyParams(message, params);
           // TODO:  Figure the config used by the parent shard and use it.
           //params.set("collection.configName", configName);
@@ -1086,12 +1147,19 @@ public class OverseerCollectionProcessor
           cmd.setState(ZkStateReader.RECOVERING);
           cmd.setCheckLive(true);
           cmd.setOnlyIfLeader(true);
-          sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+          ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+
+          setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+          sendShardRequest(nodeName, p);
+
         }
       }
 
       collectShardResponses(results, true,
-          "SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
+          "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up");
+
+      completeAsyncRequest(asyncId, requestMap, results);
 
       log.info("Successfully created all replica shards for all sub-slices " + subSlices);
 
@@ -1211,7 +1279,40 @@ public class OverseerCollectionProcessor
     } while (srsp != null);
   }
 
-  
+  private void requestStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+    log.info("Request status invoked");
+    String requestId = message.getStr(REQUESTID);
+
+    // Special taskId (-1), clears up the request state maps.
+    if(requestId.equals("-1")) {
+      completedMap.clear();
+      failureMap.clear();
+      return;
+    }
+
+    if(completedMap.contains(requestId)) {
+      SimpleOrderedMap success = new SimpleOrderedMap();
+      success.add("state", "completed");
+      success.add("msg", "found " + requestId + " in completed tasks");
+      results.add("status", success);
+    } else if (runningMap.contains(requestId)) {
+      SimpleOrderedMap success = new SimpleOrderedMap();
+      success.add("state", "running");
+      success.add("msg", "found " + requestId + " in submitted tasks");
+      results.add("status", success);
+    } else if (failureMap.contains(requestId)) {
+      SimpleOrderedMap success = new SimpleOrderedMap();
+      success.add("state", "failed");
+      success.add("msg", "found " + requestId + " in failed tasks");
+      results.add("status", success);
+    } else {
+      SimpleOrderedMap failure = new SimpleOrderedMap();
+      failure.add("state", "notfound");
+      failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
+      results.add("status", failure);
+    }
+  }
+
   private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
     log.info("Delete shard invoked");
     String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
@@ -1222,7 +1323,7 @@ public class OverseerCollectionProcessor
     if (slice == null) {
       if(clusterState.hasCollection(collection)) {
         throw new SolrException(ErrorCode.BAD_REQUEST,
-            "No shard with the specified name exists: " + slice);
+            "No shard with the specified name exists: " + slice.getName());
       } else {
         throw new SolrException(ErrorCode.BAD_REQUEST,
             "No collection with the specified name exists: " + collection);
@@ -1242,13 +1343,7 @@ public class OverseerCollectionProcessor
       params.set(CoreAdminParams.DELETE_INDEX, "true");
       sliceCmd(clusterState, params, null, slice);
 
-      ShardResponse srsp;
-      do {
-        srsp = shardHandler.takeCompletedOrError();
-        if (srsp != null) {
-          processResponse(results, srsp);
-        }
-      } while (srsp != null);
+      processResponses(results);
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
           Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection,
@@ -1314,21 +1409,29 @@ public class OverseerCollectionProcessor
           "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
     }
 
+    String asyncId = null;
+    if(message.containsKey(ASYNC) && message.get(ASYNC) != null)
+      asyncId = message.getStr(ASYNC);
+
     for (Slice sourceSlice : sourceSlices) {
       for (Slice targetSlice : targetSlices) {
         log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
-        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results);
+        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results, asyncId);
       }
     }
   }
 
-  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, DocCollection targetCollection, Slice targetSlice, String splitKey, int timeout, NamedList results) throws KeeperException, InterruptedException {
+  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+                          DocCollection targetCollection, Slice targetSlice,
+                          String splitKey, int timeout,
+                          NamedList results, String asyncId) throws KeeperException, InterruptedException {
     String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
     if (clusterState.hasCollection(tempSourceCollectionName)) {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
       Map<String, Object> props = ZkNodeProps.makeMap(
           QUEUE_OPERATION, DELETECOLLECTION,
           "name", tempSourceCollectionName);
+
       try {
         deleteCollection(new ZkNodeProps(props), results);
       } catch (Exception e) {
@@ -1350,15 +1453,23 @@ public class OverseerCollectionProcessor
     log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
 
     Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+    // For tracking async calls.
+    HashMap<String, String> requestMap = new HashMap<String, String>();
 
     log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
         + targetLeader.getStr("core") + " to buffer updates");
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+    String nodeName = targetLeader.getNodeName();
+    setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
     sendShardRequest(targetLeader.getNodeName(), params);
+
     collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates");
 
+    completeAsyncRequest(asyncId, requestMap, results);
+
     ZkNodeProps m = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
         COLLECTION_PROP, sourceCollection.getName(),
@@ -1405,6 +1516,11 @@ public class OverseerCollectionProcessor
         NUM_SLICES, 1,
         COLL_CONF, configName,
         CREATE_NODE_SET, sourceLeader.getNodeName());
+    if(asyncId != null) {
+      String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+      props.put(ASYNC, internalAsyncId);
+    }
+
     log.info("Creating temporary collection: " + props);
     createCollection(clusterState, new ZkNodeProps(props), results);
     // refresh cluster state
@@ -1437,8 +1553,13 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.RANGES, splitRange.toString());
     params.set("split.key", splitKey);
 
-    sendShardRequest(sourceLeader.getNodeName(), params);
+    String tempNodeName = sourceLeader.getNodeName();
+
+    setupAsyncRequest(asyncId, requestMap, params, tempNodeName);
+
+    sendShardRequest(tempNodeName, params);
     collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command");
+    completeAsyncRequest(asyncId, requestMap, results);
 
     log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
         tempSourceCollectionName, targetLeader.getNodeName());
@@ -1448,7 +1569,13 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.NAME, tempCollectionReplica2);
     params.set(CoreAdminParams.COLLECTION, tempSourceCollectionName);
     params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
+
+    setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
     sendShardRequest(targetLeader.getNodeName(), params);
+    collectShardResponses(results, true,
+        "MIGRATE failed to create replica of temporary collection in target leader node.");
+
+    completeAsyncRequest(asyncId, requestMap, results);
 
     coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
         zkStateReader.getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
@@ -1458,14 +1585,19 @@ public class OverseerCollectionProcessor
     cmd.setCoreName(tempSourceLeader.getStr("core"));
     cmd.setNodeName(targetLeader.getNodeName());
     cmd.setCoreNodeName(coreNodeName);
-    cmd.setState(ZkStateReader.ACTIVE); // todo introduce asynchronous actions
+    cmd.setState(ZkStateReader.ACTIVE);
     cmd.setCheckLive(true);
     cmd.setOnlyIfLeader(true);
-    sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
+    params = new ModifiableSolrParams(cmd.getParams());
+
+    setupAsyncRequest(asyncId, requestMap, params, tempSourceLeader.getNodeName());
+
+    sendShardRequest(tempSourceLeader.getNodeName(), params);
 
     collectShardResponses(results, true,
         "MIGRATE failed to create temp collection replica or timed out waiting for them to come up");
 
+    completeAsyncRequest(asyncId, requestMap, results);
     log.info("Successfully created replica of temp source collection on target leader node");
 
     log.info("Requesting merge of temp source collection replica to target leader");
@@ -1473,18 +1605,27 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.ACTION, CoreAdminAction.MERGEINDEXES.toString());
     params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
     params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+    setupAsyncRequest(asyncId, requestMap, params, sourceLeader.getNodeName());
+
     sendShardRequest(targetLeader.getNodeName(), params);
     collectShardResponses(results, true,
         "MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName());
 
+    completeAsyncRequest(asyncId, requestMap, results);
+
     log.info("Asking target leader to apply buffered updates");
     params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+    setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
+
     sendShardRequest(targetLeader.getNodeName(), params);
     collectShardResponses(results, true,
         "MIGRATE failed to request node to apply buffered updates");
 
+    completeAsyncRequest(asyncId, requestMap, results);
+
     try {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
       props = ZkNodeProps.makeMap(
@@ -1497,6 +1638,21 @@ public class OverseerCollectionProcessor
     }
   }
 
+  private void completeAsyncRequest(String asyncId, HashMap<String, String> requestMap, NamedList results) {
+    if(asyncId != null) {
+      waitForAsyncCallsToComplete(requestMap, results);
+      requestMap.clear();
+    }
+  }
+
+  private void setupAsyncRequest(String asyncId, HashMap<String, String> requestMap, ModifiableSolrParams params, String nodeName) {
+    if(asyncId != null) {
+      String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
+      params.set(ASYNC, coreAdminAsyncId);
+      requestMap.put(nodeName, coreAdminAsyncId);
+    }
+  }
+
   private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
     if (a == null || b == null || !a.overlaps(b)) {
       return null;
@@ -1542,6 +1698,11 @@ public class OverseerCollectionProcessor
       // if it does not, find best nodes to create more cores
       
       int repFactor = message.getInt( REPLICATION_FACTOR, 1);
+
+      String async = null;
+      if (message.containsKey("async"))
+        async = message.getStr("async");
+
       Integer numSlices = message.getInt(NUM_SLICES, null);
       String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
       List<String> shardNames = new ArrayList<>();
@@ -1627,6 +1788,9 @@ public class OverseerCollectionProcessor
       if (!created)
         throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
 
+      // For tracking async calls.
+      HashMap<String, String> requestMap = new HashMap<String, String>();
+
       log.info("Creating SolrCores for new collection, shardNames {} , replicationFactor : {}", shardNames, repFactor);
       Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
       for (int i = 1; i <= shardNames.size(); i++) {
@@ -1662,6 +1826,9 @@ public class OverseerCollectionProcessor
           params.set(CoreAdminParams.COLLECTION, collectionName);
           params.set(CoreAdminParams.SHARD, sliceName);
           params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+          setupAsyncRequest(async, requestMap, params, nodeName);
+
           addPropertyParams(message, params);
 
           ShardRequest sreq = new ShardRequest();
@@ -1689,13 +1856,9 @@ public class OverseerCollectionProcessor
         }
       }
 
-      ShardResponse srsp;
-      do {
-        srsp = shardHandler.takeCompletedOrError();
-        if (srsp != null) {
-          processResponse(results, srsp);
-        }
-      } while (srsp != null);
+      processResponses(results);
+
+      completeAsyncRequest(async, requestMap, results);
 
       log.info("Finished create command on all shards for collection: "
           + collectionName);
@@ -1829,6 +1992,18 @@ public class OverseerCollectionProcessor
       "ADDREPLICA failed to create replica");
   }
 
+
+  private void processResponses(NamedList results) {
+    ShardResponse srsp;
+    do {
+      srsp = shardHandler.takeCompletedOrError();
+      if (srsp != null) {
+        processResponse(results, srsp);
+      }
+    } while (srsp != null);
+  }
+
+
   private String createConfNode(String coll, ZkNodeProps message, boolean isLegacyCloud) throws KeeperException, InterruptedException {
     String configName = message.getStr(OverseerCollectionProcessor.COLL_CONF);
     if(configName == null){
@@ -1873,14 +2048,8 @@ public class OverseerCollectionProcessor
       Slice slice = entry.getValue();
       sliceCmd(clusterState, params, stateMatcher, slice);
     }
-    
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        processResponse(results, srsp);
-      }
-    } while (srsp != null);
+
+    processResponses(results);
 
   }
 
@@ -1950,4 +2119,70 @@ public class OverseerCollectionProcessor
     return isClosed;
   }
 
+  private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+    for(String k:requestMap.keySet()) {
+      log.debug("I am Waiting for : " + k + "/" + requestMap.get(k));
+      results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
+    }
+  }
+
+  private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
+    params.set(CoreAdminParams.REQUESTID, requestId);
+    int counter = 0;
+    ShardRequest sreq;
+    do {
+      sreq = new ShardRequest();
+      params.set("qt", adminPath);
+      sreq.purpose = 1;
+      String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+      sreq.shards = new String[] {replica};
+      sreq.actualShards = sreq.shards;
+      sreq.params = params;
+
+      shardHandler.submit(sreq, replica, sreq.params);
+
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          NamedList results = new NamedList();
+          processResponse(results, srsp);
+          String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
+          if(r.equals("running")) {
+            log.debug("The task is still RUNNING, continuing to wait.");
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            continue;
+
+          } else if(r.equals("completed")) {
+            log.debug("The task is COMPLETED, returning");
+            return srsp.getSolrResponse().getResponse();
+          } else if (r.equals("failed")) {
+            // TODO: Improve this. Get more information.
+            log.debug("The task is FAILED, returning");
+            return srsp.getSolrResponse().getResponse();
+          } else if (r.equals("notfound")) {
+            log.debug("The task is notfound, retry");
+            if(counter++ < 5) {
+              try {
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+              }
+              break;
+            }
+            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request: " + srsp.getSolrResponse().getResponse().get("STATUS") +
+            "retried " + counter + "times");
+          } else {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
+          }
+        }
+      } while (srsp != null);
+    } while(true);
+  }
+
 }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Mar 17 21:29:03 2014
@@ -92,6 +92,10 @@ public final class ZkController {
   
   private final DistributedQueue overseerJobQueue;
   private final DistributedQueue overseerCollectionQueue;
+
+  private final DistributedMap overseerRunningMap;
+  private final DistributedMap overseerCompletedMap;
+  private final DistributedMap overseerFailureMap;
   
   public static final String CONFIGS_ZKNODE = "/configs";
 
@@ -279,6 +283,9 @@ public final class ZkController {
     
     this.overseerJobQueue = Overseer.getInQueue(zkClient);
     this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
+    this.overseerRunningMap = Overseer.getRunningMap(zkClient);
+    this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
+    this.overseerFailureMap = Overseer.getFailureMap(zkClient);
     cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
     leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient);
@@ -1583,6 +1590,18 @@ public final class ZkController {
   public DistributedQueue getOverseerCollectionQueue() {
     return overseerCollectionQueue;
   }
+
+  public DistributedMap getOverseerRunningMap() {
+    return overseerRunningMap;
+  }
+
+  public DistributedMap getOverseerCompletedMap() {
+    return overseerCompletedMap;
+  }
+
+  public DistributedMap getOverseerFailureMap() {
+    return overseerFailureMap;
+  }
   
   public int getClientTimeout() {
     return clientTimeout;

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java Mon Mar 17 21:29:03 2014
@@ -368,6 +368,13 @@ public class CoreContainer {
     }
 
     try {
+      coreAdminHandler.shutdown();
+    } catch (Exception e) {
+      log.warn("Error shutting down CoreAdminHandler. Continuing to shutdown CoreContainer.");
+      e.printStackTrace();
+    }
+
+    try {
       // First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
       synchronized (solrCores.getModifyLock()) {
         solrCores.getModifyLock().notifyAll(); // wake up anyone waiting

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Mar 17 21:29:03 2014
@@ -18,6 +18,7 @@ package org.apache.solr.handler.admin;
  */
 
 import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
@@ -25,6 +26,7 @@ import static org.apache.solr.cloud.Over
 import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
 import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
@@ -51,6 +53,7 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -199,6 +202,10 @@ public class CollectionsHandler extends 
         this.handleAddReplica(req, rsp);
         break;
       }
+      case REQUESTSTATUS: {
+        this.handleRequestStatus(req, rsp);
+        break;
+      }
       default: {
           throw new RuntimeException("Unknown action: " + action);
       }
@@ -236,6 +243,16 @@ public class CollectionsHandler extends 
 
   public static long DEFAULT_ZK_TIMEOUT = 180*1000;
 
+  private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+    log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
+    req.getParams().required().check(REQUESTID);
+    Map<String, Object> props = new HashMap<String, Object>();
+    props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.REQUESTSTATUS);
+    props.put(REQUESTID, req.getParams().get(REQUESTID));
+    ZkNodeProps m = new ZkNodeProps(props);
+    handleResponse(OverseerCollectionProcessor.REQUESTSTATUS, m, rsp);
+  }
+
   private void handleResponse(String operation, ZkNodeProps m,
                               SolrQueryResponse rsp) throws KeeperException, InterruptedException {
     handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@@ -244,6 +261,35 @@ public class CollectionsHandler extends 
   private void handleResponse(String operation, ZkNodeProps m,
       SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
     long time = System.nanoTime();
+
+     if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
+ 
+       String asyncId = m.getStr(ASYNC);
+ 
+       if(asyncId.equals("-1")) {
+         throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
+       }
+ 
+       NamedList<String> r = new NamedList<>();
+ 
+       if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
+           coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
+           coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
+         r.add("error", "Task with the same requestid already exists.");
+ 
+       } else {
+         coreContainer.getZkController().getOverseerCollectionQueue()
+             .offer(ZkStateReader.toJSON(m));
+ 
+       }
+       r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
+       SolrResponse response = new OverseerSolrResponse(r);
+ 
+       rsp.getValues().addAll(response.getResponse());
+ 
+       return;
+     }
+
     QueueEvent event = coreContainer.getZkController()
         .getOverseerCollectionQueue()
         .offer(ZkStateReader.toJSON(m), timeout);
@@ -368,6 +414,7 @@ public class CollectionsHandler extends 
          MAX_SHARDS_PER_NODE,
         CREATE_NODE_SET ,
         SHARDS_PROP,
+        ASYNC,
         "router.");
 
     copyPropertiesIfNotNull(req.getParams(), props);
@@ -380,7 +427,7 @@ public class CollectionsHandler extends 
     log.info("Remove replica: " + req.getParamString());
     req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
     Map<String, Object> map = makeMap(QUEUE_OPERATION, DELETEREPLICA);
-    copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica");
+    copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC);
     ZkNodeProps m = new ZkNodeProps(map);
     handleResponse(DELETEREPLICA, m, rsp);
   }
@@ -394,7 +441,7 @@ public class CollectionsHandler extends 
       throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
 
     Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD);
-    copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET);
+    copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET, ASYNC);
     copyPropertiesIfNotNull(req.getParams(), map);
     ZkNodeProps m = new ZkNodeProps(map);
     handleResponse(CREATESHARD, m, rsp);
@@ -485,6 +532,10 @@ public class CollectionsHandler extends 
     if (rangesStr != null)  {
       props.put(CoreAdminParams.RANGES, rangesStr);
     }
+
+    if (req.getParams().get(ASYNC) != null)
+      props.put(ASYNC, req.getParams().get(ASYNC));
+
     copyPropertiesIfNotNull(req.getParams(), props);
 
     ZkNodeProps m = new ZkNodeProps(props);
@@ -497,7 +548,7 @@ public class CollectionsHandler extends 
     req.getParams().required().check("collection", "split.key", "target.collection");
     Map<String,Object> props = new HashMap<>();
     props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.MIGRATE);
-    copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout");
+    copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC);
     ZkNodeProps m = new ZkNodeProps(props);
     handleResponse(OverseerCollectionProcessor.MIGRATE, m, rsp, DEFAULT_ZK_TIMEOUT * 20);
   }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Mar 17 21:29:03 2014
@@ -17,21 +17,8 @@
 
 package org.apache.solr.handler.admin;
 
-import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.lucene.index.DirectoryReader;
@@ -55,6 +42,7 @@ import org.apache.solr.common.params.Cor
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.core.CloseHook;
@@ -75,14 +63,31 @@ import org.apache.solr.update.SplitIndex
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.NumberUtils;
 import org.apache.solr.util.RefCounted;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 
 /**
  *
@@ -91,6 +96,24 @@ import com.google.common.collect.Lists;
 public class CoreAdminHandler extends RequestHandlerBase {
   protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
   protected final CoreContainer coreContainer;
+  protected static HashMap<String, Map<String, TaskObject>> requestStatusMap =
+      new HashMap<String,Map<String, TaskObject>>();
+
+  protected ExecutorService parallelExecutor = null;
+
+  protected static int MAX_TRACKED_REQUESTS = 100;
+  public static String RUNNING = "running";
+  public static String COMPLETED = "completed";
+  public static String FAILED = "failed";
+  public static String RESPONSE = "Response";
+  public static String RESPONSE_STATUS = "STATUS";
+  public static String RESPONSE_MESSAGE = "msg";
+
+  static {
+    requestStatusMap.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+    requestStatusMap.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+    requestStatusMap.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+  }
 
   public CoreAdminHandler() {
     super();
@@ -136,6 +159,18 @@ public class CoreAdminHandler extends Re
               "Core container instance missing");
     }
     //boolean doPersist = false;
+    String taskId = req.getParams().get("async");
+    TaskObject taskObject = new TaskObject(taskId);
+
+    if(taskId != null) {
+      // Put the tasks into the maps for tracking
+      if (getMap(RUNNING).containsKey(taskId) || getMap(COMPLETED).containsKey(taskId) || getMap(FAILED).containsKey(taskId)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Duplicate request with the same requestid found.");
+      }
+
+      addTask(RUNNING, taskObject);
+    }
 
     // Pick the action
     SolrParams params = req.getParams();
@@ -147,6 +182,19 @@ public class CoreAdminHandler extends Re
         this.handleCustomAction(req, rsp);
       }
     }
+
+    if (taskId == null) {
+      handleRequestInternal(req, rsp, action);
+    } else {
+      ParallelCoreAdminHandlerThread parallelHandlerThread = new ParallelCoreAdminHandlerThread(req, rsp, action, taskObject);
+      if(parallelExecutor == null || parallelExecutor.isShutdown())
+        parallelExecutor = Executors.newFixedThreadPool(50,
+                  new DefaultSolrThreadFactory("parallelCoreAdminExecutor"));
+        parallelExecutor.execute(parallelHandlerThread);
+    }
+  }
+
+  protected void handleRequestInternal(SolrQueryRequest req, SolrQueryResponse rsp, CoreAdminAction action) throws Exception {
     if (action != null) {
       switch (action) {
         case CREATE: {
@@ -199,17 +247,17 @@ public class CoreAdminHandler extends Re
           this.handleWaitForStateAction(req, rsp);
           break;
         }
-        
+
         case REQUESTRECOVERY: {
           this.handleRequestRecoveryAction(req, rsp);
           break;
         }
-        
+
         case REQUESTSYNCSHARD: {
           this.handleRequestSyncAction(req, rsp);
           break;
         }
-        
+
         // todo : Can this be done by the regular RecoveryStrategy route?
         case REQUESTAPPLYUPDATES: {
           this.handleRequestApplyUpdatesAction(req, rsp);
@@ -219,6 +267,10 @@ public class CoreAdminHandler extends Re
           this.handleRequestBufferUpdatesAction(req, rsp);
           break;
         }
+        case REQUESTSTATUS: {
+          this.handleRequestActionStatus(req, rsp);
+          break;
+        }
         case OVERSEEROP:{
           ZkController zkController = coreContainer.getZkController();
           if(zkController != null){
@@ -240,7 +292,7 @@ public class CoreAdminHandler extends Re
     rsp.setHttpCaching(false);
   }
 
-  
+
   /**
    * Handle the core admin SPLIT action.
    */
@@ -755,6 +807,28 @@ public class CoreAdminHandler extends Re
   }
 
   /**
+   * Handle "REQUESTSTATUS" action
+   */
+  protected void handleRequestActionStatus(SolrQueryRequest req, SolrQueryResponse rsp) {
+    SolrParams params = req.getParams();
+    String requestId = params.get(CoreAdminParams.REQUESTID);
+    log.info("Checking request status for : " + requestId);
+
+    if (mapContainsTask(RUNNING, requestId)) {
+      rsp.add(RESPONSE_STATUS, RUNNING);
+    } else if(mapContainsTask(COMPLETED, requestId)) {
+      rsp.add(RESPONSE_STATUS, COMPLETED);
+      rsp.add(RESPONSE, getMap(COMPLETED).get(requestId).getRspObject());
+    } else if(mapContainsTask(FAILED, requestId)) {
+      rsp.add(RESPONSE_STATUS, FAILED);
+      rsp.add(RESPONSE, getMap(FAILED).get(requestId).getRspObject());
+    } else {
+      rsp.add(RESPONSE_STATUS, "notfound");
+      rsp.add(RESPONSE_MESSAGE, "No task found in running, completed or failed tasks");
+    }
+  }
+
+  /**
    * Handle "SWAP" action
    */
   protected void handleSwapAction(SolrQueryRequest req, SolrQueryResponse rsp) {
@@ -1172,4 +1246,123 @@ public class CoreAdminHandler extends Re
   public String getSource() {
     return "$URL$";
   }
+
+  /**
+   * Class to implement multi-threaded CoreAdminHandler behaviour.
+   * This accepts all of the context from handleRequestBody.
+   */
+  protected class ParallelCoreAdminHandlerThread implements Runnable {
+    SolrQueryRequest req;
+    SolrQueryResponse rsp;
+    CoreAdminAction action;
+    TaskObject taskObject;
+
+    public ParallelCoreAdminHandlerThread (SolrQueryRequest req, SolrQueryResponse rsp,
+                                           CoreAdminAction action, TaskObject taskObject){
+      this.req = req;
+      this.rsp = rsp;
+      this.action = action;
+      this.taskObject = taskObject;
+    }
+
+    public void run() {
+      boolean exceptionCaught = false;
+      try {
+        handleRequestInternal(req, rsp, action);
+        taskObject.setRspObject(rsp);
+      } catch (Exception e) {
+        exceptionCaught = true;
+        taskObject.setRspObjectFromException(e);
+      } finally {
+        removeTask("running", taskObject.taskId);
+        if(exceptionCaught) {
+          addTask("failed", taskObject, true);
+        } else
+          addTask("completed", taskObject, true);
+      }
+
+    }
+
+  }
+
+  /**
+   * Helper class to manage the tasks to be tracked.
+   * This contains the taskId, request and the response (if available).
+   */
+  private class TaskObject {
+    String taskId;
+    String rspInfo;
+
+    public TaskObject(String taskId) {
+      this.taskId = taskId;
+    }
+
+    public String getRspObject() {
+      return rspInfo;
+    }
+
+    public void setRspObject(SolrQueryResponse rspObject) {
+      this.rspInfo = rspObject.getToLogAsString("TaskId: " + this.taskId + " ");
+    }
+
+    public void setRspObjectFromException(Exception e) {
+      this.rspInfo = e.getMessage();
+    }
+  }
+
+  /**
+   * Helper method to add a task to a tracking map.
+   */
+  protected void addTask(String map, TaskObject o, boolean limit) {
+    if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
+      String key = getMap(map).entrySet().iterator().next().getKey();
+      getMap(map).remove(key);
+    }
+    addTask(map, o);
+  }
+
+
+  protected void addTask(String map, TaskObject o) {
+    synchronized (getMap(map)) {
+      getMap(map).put(o.taskId, o);
+    }
+  }
+
+  /**
+   * Helper method to remove a task from a tracking map.
+   */
+  protected void removeTask(String map, String taskId) {
+    synchronized (getMap(map)) {
+      getMap(map).remove(taskId);
+    }
+  }
+
+  /**
+   * Helper method to check if a map contains a taskObject with the given taskId.
+   */
+  protected boolean mapContainsTask(String map, String taskId) {
+    return getMap(map).containsKey(taskId);
+  }
+
+  /**
+   * Helper method to get a TaskObject given a map and a taskId.
+   */
+  protected TaskObject getTask(String map, String taskId) {
+    return getMap(map).get(taskId);
+  }
+
+  /**
+   * Helper method to get a request status map given the name.
+   */
+  private Map<String, TaskObject> getMap(String map) {
+    return requestStatusMap.get(map);
+  }
+
+  /**
+   * Method to ensure shutting down of the ThreadPool Executor.
+   */
+  public void shutdown() {
+    if (parallelExecutor != null && !parallelExecutor.isShutdown())
+      ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
+  }
 }

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java Mon Mar 17 21:29:03 2014
@@ -114,7 +114,7 @@ public class MigrateRouteKeyTest extends
     return ruleRemoved;
   }
 
-  private void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
+  protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
     params.set("collection", sourceCollection);
@@ -125,7 +125,7 @@ public class MigrateRouteKeyTest extends
     invoke(params);
   }
 
-  private void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
+  protected void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
     SolrRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
 
@@ -161,7 +161,7 @@ public class MigrateRouteKeyTest extends
     waitForRecoveriesToFinish(targetCollection, false);
   }
 
-  private void multipleShardMigrateTest() throws Exception  {
+  protected void multipleShardMigrateTest() throws Exception  {
     del("*:*");
     commit();
     assertTrue(cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() == 0);

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Mon Mar 17 21:29:03 2014
@@ -71,6 +71,9 @@ public class OverseerCollectionProcessor
   private static final String CONFIG_NAME = "myconfig";
   
   private static DistributedQueue workQueueMock;
+  private static DistributedMap runningMapMock;
+  private static DistributedMap completedMapMock;
+  private static DistributedMap failureMapMock;
   private static ShardHandler shardHandlerMock;
   private static ZkStateReader zkStateReaderMock;
   private static ClusterState clusterStateMock;
@@ -90,8 +93,10 @@ public class OverseerCollectionProcessor
     
     public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
         String myId, ShardHandler shardHandler, String adminPath,
-        DistributedQueue workQueue) {
-      super(zkStateReader, myId, shardHandler, adminPath, workQueue);
+        DistributedQueue workQueue, DistributedMap runningMap,
+        DistributedMap completedMap,
+        DistributedMap failureMap) {
+      super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap);
     }
     
     @Override
@@ -111,6 +116,9 @@ public class OverseerCollectionProcessor
   @BeforeClass
   public static void setUpOnce() throws Exception {
     workQueueMock = createMock(DistributedQueue.class);
+    runningMapMock = createMock(DistributedMap.class);
+    completedMapMock = createMock(DistributedMap.class);
+    failureMapMock = createMock(DistributedMap.class);
     shardHandlerMock = createMock(ShardHandler.class);
     zkStateReaderMock = createMock(ZkStateReader.class);
     clusterStateMock = createMock(ClusterState.class);
@@ -120,6 +128,9 @@ public class OverseerCollectionProcessor
   @AfterClass
   public static void tearDownOnce() {
     workQueueMock = null;
+    runningMapMock = null;
+    completedMapMock = null;
+    failureMapMock = null;
     shardHandlerMock = null;
     zkStateReaderMock = null;
     clusterStateMock = null;
@@ -131,13 +142,16 @@ public class OverseerCollectionProcessor
     super.setUp();
     queue.clear();
     reset(workQueueMock);
-    reset(workQueueMock);
+    reset(runningMapMock);
+    reset(completedMapMock);
+    reset(failureMapMock);
     reset(shardHandlerMock);
     reset(zkStateReaderMock);
     reset(clusterStateMock);
     reset(solrZkClientMock);
     underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
-        "1234", shardHandlerMock, ADMIN_PATH, workQueueMock);
+        "1234", shardHandlerMock, ADMIN_PATH, workQueueMock, runningMapMock,
+        completedMapMock, failureMapMock);
     zkMap.clear();
     collectionsSet.clear();
   }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Mon Mar 17 21:29:03 2014
@@ -17,21 +17,21 @@
 
 package org.apache.solr.client.solrj.request;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ContentStream;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
 /**
  * This class is experimental and subject to change.
  *
@@ -41,6 +41,7 @@ public class CollectionAdminRequest exte
 {
   protected String collection = null;
   protected CollectionAction action = null;
+  protected String asyncId = null;
 
   protected static class CollectionShardAdminRequest extends CollectionAdminRequest {
     protected String shardName = null;
@@ -53,6 +54,7 @@ public class CollectionAdminRequest exte
       params.remove( "name" );
       params.set( "collection", collection );
       params.set( "shard", shardName);
+      params.set( "async", asyncId);
       return params;
     }
 
@@ -124,6 +126,9 @@ public class CollectionAdminRequest exte
         // OverseerCollectionProcessor.REPLICATION_FACTOR
         params.set( "replicationFactor", replicationFactor);
       }
+      if (asyncId != null) {
+        params.set("async", asyncId);
+      }
 
       return params;
     }
@@ -188,6 +193,25 @@ public class CollectionAdminRequest exte
     }
   }
 
+  //a request status collection request
+  public static class RequestStatus extends CollectionAdminRequest {
+    protected  String requestId = null;
+
+    public RequestStatus() {
+      action = CollectionAction.REQUESTSTATUS;
+    }
+
+    public void setRequestId(String requestId) {this.requestId = requestId; }
+    public String getRequestId() { return this.requestId; }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+      params.set("requestid", requestId);
+      return params;
+    }
+  }
+
   //a collection alias create request
   public static class CreateAlias extends CollectionAdminRequest {
     protected String aliasedCollections = null;
@@ -239,6 +263,10 @@ public class CollectionAdminRequest exte
     this.action = action;
   }
 
+  public void setAsyncId(String asyncId) {
+    this.asyncId = asyncId;
+  }
+
   //---------------------------------------------------------------------------------------
   //
   //---------------------------------------------------------------------------------------
@@ -285,7 +313,19 @@ public class CollectionAdminRequest exte
                                                           String nodeSet,
                                                           String conf,
                                                           String routerField,
-                                                          SolrServer server ) throws SolrServerException, IOException
+                                                          SolrServer server) throws SolrServerException, IOException
+  {
+    return createCollection(name, shards, repl, maxShards, nodeSet, conf, routerField, server, null);
+  }
+
+  // creates collection using a compositeId router
+  public static CollectionAdminResponse createCollection( String name,
+                                                          Integer shards, Integer repl, Integer maxShards,
+                                                          String nodeSet,
+                                                          String conf,
+                                                          String routerField,
+                                                          SolrServer server,
+                                                          String asyncId) throws SolrServerException, IOException
   {
     Create req = new Create();
     req.setCollectionName(name);
@@ -296,27 +336,47 @@ public class CollectionAdminRequest exte
     req.setCreateNodeSet(nodeSet);
     req.setConfigName(conf);
     req.setRouterField(routerField);
+    req.setAsyncId(asyncId);
     return req.process( server );
   }
+
+  public static CollectionAdminResponse createCollection(String name, Integer shards,
+                                                        String conf,
+                                                        SolrServer server) throws SolrServerException, IOException {
+    return createCollection(name, shards, conf, server, null);
+  }
+
   public static CollectionAdminResponse createCollection( String name,
                                                           Integer shards, String conf,
-                                                          SolrServer server ) throws SolrServerException, IOException
+                                                          SolrServer server,
+                                                          String asyncId) throws SolrServerException, IOException
   {
     Create req = new Create();
     req.setCollectionName(name);
     req.setRouterName("compositeId");
     req.setNumShards(shards);
     req.setConfigName(conf);
+    req.setAsyncId(asyncId);
     return req.process( server );
   }
 
+  public static CollectionAdminResponse createCollection(String name,
+                                                         String shards,
+                                                         Integer repl, Integer maxShards,
+                                                         String conf,
+                                                         String routerField,
+                                                         SolrServer server) throws SolrServerException, IOException {
+    return createCollection(name, shards, repl, maxShards, conf, routerField, null);
+  }
+
   // creates a collection using an implicit router
   public static CollectionAdminResponse createCollection( String name,
                                                           String shards, Integer repl, Integer maxShards,
                                                           String nodeSet,
                                                           String conf,
                                                           String routerField,
-                                                          SolrServer server ) throws SolrServerException, IOException
+                                                          SolrServer server,
+                                                          String asyncId) throws SolrServerException, IOException
   {
     Create req = new Create();
     req.setCollectionName(name);
@@ -327,34 +387,68 @@ public class CollectionAdminRequest exte
     req.setCreateNodeSet(nodeSet);
     req.setConfigName(conf);
     req.setRouterField(routerField);
+    req.setAsyncId(asyncId);
     return req.process( server );
   }
+
+  public static CollectionAdminResponse createCollection( String name,
+                                                          String shards, String conf,
+                                                          SolrServer server) throws SolrServerException, IOException
+  {
+    return createCollection(name, shards, conf, server, null);
+  }
+
   public static CollectionAdminResponse createCollection( String name,
                                                           String shards, String conf,
-                                                          SolrServer server ) throws SolrServerException, IOException
+                                                          SolrServer server, String asyncId ) throws SolrServerException, IOException
   {
     Create req = new Create();
     req.setCollectionName(name);
     req.setRouterName("implicit");
     req.setShards(shards);
     req.setConfigName(conf);
+    req.setAsyncId(asyncId);
     return req.process( server );
   }
 
-  public static CollectionAdminResponse reloadCollection( String name, SolrServer server ) throws SolrServerException, IOException
+  public static CollectionAdminResponse reloadCollection( String name, SolrServer server)
+      throws SolrServerException, IOException {
+    return reloadCollection(name, server, null);
+  }
+
+  public static CollectionAdminResponse reloadCollection( String name, SolrServer server, String asyncId )
+      throws SolrServerException, IOException
   {
     CollectionAdminRequest req = new Reload();
     req.setCollectionName(name);
+    req.setAsyncId(asyncId);
     return req.process( server );
   }
 
-  public static CollectionAdminResponse deleteCollection( String name, SolrServer server ) throws SolrServerException, IOException
+  public static CollectionAdminResponse deleteCollection( String name, SolrServer server)
+      throws SolrServerException, IOException
+  {
+    return deleteCollection(name, server, null);
+  }
+
+  public static CollectionAdminResponse deleteCollection( String name, SolrServer server,
+                                                          String asyncId)
+      throws SolrServerException, IOException
   {
     CollectionAdminRequest req = new Delete();
     req.setCollectionName(name);
+    req.setAsyncId(asyncId);
     return req.process( server );
   }
 
+  public static CollectionAdminResponse requestStatus(String requestId, SolrServer server)
+      throws SolrServerException, IOException {
+    RequestStatus req = new RequestStatus();
+
+    req.setRequestId(requestId);
+    return req.process(server);
+  }
+
   public static CollectionAdminResponse createShard( String name, String shard, String nodeSet, SolrServer server ) throws SolrServerException, IOException
   {
     CreateShard req = new CreateShard();
@@ -370,15 +464,29 @@ public class CollectionAdminRequest exte
 
   public static CollectionAdminResponse splitShard( String name, String shard, String ranges, SolrServer server ) throws SolrServerException, IOException
   {
+    return splitShard(name, shard, ranges, server, null);
+  }
+
+  public static CollectionAdminResponse splitShard( String name, String shard, String ranges, SolrServer server,
+                                                    String asyncId) throws SolrServerException, IOException
+  {
     SplitShard req = new SplitShard();
     req.setCollectionName(name);
     req.setShardName(shard);
     req.setRanges(ranges);
+    req.setAsyncId(asyncId);
     return req.process( server );
   }
-  public static CollectionAdminResponse splitShard( String name, String shard, SolrServer server ) throws SolrServerException, IOException
+
+  public static CollectionAdminResponse splitShard(String name, String shard, SolrServer server)
+      throws SolrServerException, IOException {
+    return splitShard(name, shard, null, server, null);
+  }
+
+  public static CollectionAdminResponse splitShard( String name, String shard, SolrServer server,
+                                                    String asyncId ) throws SolrServerException, IOException
   {
-    return splitShard(name, shard, null, server);
+    return splitShard(name, shard, null, server, asyncId);
   }
 
   public static CollectionAdminResponse deleteShard( String name, String shard, SolrServer server ) throws SolrServerException, IOException

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Mon Mar 17 21:29:03 2014
@@ -42,6 +42,7 @@ public interface CollectionParams 
     ADDROLE,
     REMOVEROLE,
     CLUSTERPROP,
+    REQUESTSTATUS,
     ADDREPLICA;
     
     public static CollectionAction get( String p )

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Mon Mar 17 21:29:03 2014
@@ -91,7 +91,9 @@ public abstract class CoreAdminParams
   public final static String RANGES = "ranges";
   
   public static final String ROLES = "roles";
-  
+
+  public static final String REQUESTID = "requestid";
+
   public static final String CORE_NODE_NAME = "coreNodeName";
   
   /** Prefix for core property name=value pair **/
@@ -128,7 +130,8 @@ public abstract class CoreAdminParams
     REQUESTAPPLYUPDATES,
     LOAD_ON_STARTUP,
     TRANSIENT,
-    OVERSEEROP;
+    OVERSEEROP,
+    REQUESTSTATUS;
     
     public static CoreAdminAction get( String p )
     {