You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by an...@apache.org on 2014/03/14 09:01:20 UTC

svn commit: r1577444 [1/2] - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ core/src/test/org/apache/solr/handler/...

Author: anshum
Date: Fri Mar 14 08:01:18 2014
New Revision: 1577444

URL: http://svn.apache.org/r1577444
Log:
SOLR-5477: Async execution of OverseerCollectionProcessor tasks

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminRequestStatusTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Mar 14 08:01:18 2014
@@ -119,6 +119,8 @@ New Features
 * SOLR-5653: Create a RestManager to provide REST API endpoints for
   reconfigurable plugins. (Tim Potter, Steve Rowe)
 
+* SOLR-5477: Async execution of OverseerCollectionProcessor(CollectionsAPI)
+  tasks. (Anshum Gupta)
 
 Bug Fixes
 ----------------------

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,232 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A distributed map.
+ * This supports basic map functions e.g. get, put, contains for interaction with zk which
+ * don't have to be ordered i.e. DistributedQueue.
+ */
+public class DistributedMap {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DistributedMap.class);
+
+  private static long DEFAULT_TIMEOUT = 5*60*1000;
+
+  private final String dir;
+
+  private SolrZkClient zookeeper;
+  private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+  private final String prefix = "mn-";
+
+  private final String response_prefix = "mnr-" ;
+
+  public DistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl) {
+    this.dir = dir;
+
+    ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
+    try {
+      cmdExecutor.ensureExists(dir, zookeeper);
+    } catch (KeeperException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
+
+    if (acl != null) {
+      this.acl = acl;
+    }
+    this.zookeeper = zookeeper;
+  }
+
+  private class LatchChildWatcher implements Watcher {
+
+    Object lock = new Object();
+    private WatchedEvent event = null;
+
+    public LatchChildWatcher() {}
+
+    public LatchChildWatcher(Object lock) {
+      this.lock = lock;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
+          + event.getState() + " type " + event.getType());
+      synchronized (lock) {
+        this.event = event;
+        lock.notifyAll();
+      }
+    }
+
+    public void await(long timeout) throws InterruptedException {
+      synchronized (lock) {
+        lock.wait(timeout);
+      }
+    }
+
+    public WatchedEvent getWatchedEvent() {
+      return event;
+    }
+  }
+
+  /**
+   * Inserts data into zookeeper.
+   *
+   * @return true if data was successfully added
+   */
+  private String createData(String path, byte[] data, CreateMode mode)
+      throws KeeperException, InterruptedException {
+      for (;;) {
+      try {
+        return zookeeper.create(path, data, acl, mode, true);
+      } catch (KeeperException.NoNodeException e) {
+        try {
+          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+        } catch (KeeperException.NodeExistsException ne) {
+          // someone created it
+        }
+      }
+    }
+  }
+
+
+  public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
+    return createData(dir + "/" + prefix + trackingId, data,
+        CreateMode.PERSISTENT) != null;
+  }
+
+  /**
+   * Offer the data and wait for the response
+   *
+   */
+  public MapEvent put(String trackingId, byte[] data, long timeout) throws KeeperException,
+      InterruptedException {
+    String path = createData(dir + "/" + prefix + trackingId, data,
+        CreateMode.PERSISTENT);
+    String watchID = createData(
+        dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
+        null, CreateMode.EPHEMERAL);
+    Object lock = new Object();
+    LatchChildWatcher watcher = new LatchChildWatcher(lock);
+    synchronized (lock) {
+      if (zookeeper.exists(watchID, watcher, true) != null) {
+        watcher.await(timeout);
+      }
+    }
+    byte[] bytes = zookeeper.getData(watchID, null, null, true);
+    zookeeper.delete(watchID, -1, true);
+    return new MapEvent(watchID, bytes, watcher.getWatchedEvent());
+  }
+
+  public MapEvent get(String trackingId) throws KeeperException, InterruptedException {
+    return new MapEvent(trackingId, zookeeper.getData(dir + "/" + prefix + trackingId, null, null, true), null);
+  }
+
+  public boolean contains(String trackingId) throws KeeperException, InterruptedException {
+    return zookeeper.exists(dir + "/" + prefix + trackingId, true);
+  }
+
+  public void remove(String trackingId) throws KeeperException, InterruptedException {
+    zookeeper.delete(dir + "/" + prefix + trackingId, -1, true);
+  }
+
+  /**
+   * Helper method to clear all child nodes for a parent node.
+   */
+  public void clear() throws KeeperException, InterruptedException {
+    List<String> childNames = zookeeper.getChildren(dir, null, true);
+    for(String childName: childNames) {
+      zookeeper.delete(dir + "/" + childName, -1, true);
+    }
+
+  }
+
+  public static class MapEvent {
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((id == null) ? 0 : id.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      MapEvent other = (MapEvent) obj;
+      if (id == null) {
+        if (other.id != null) return false;
+      } else if (!id.equals(other.id)) return false;
+      return true;
+    }
+
+    private WatchedEvent event = null;
+    private String id;
+    private byte[] bytes;
+
+    MapEvent(String id, byte[] bytes, WatchedEvent event) {
+      this.id = id;
+      this.bytes = bytes;
+      this.event = event;
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void setBytes(byte[] bytes) {
+      this.bytes = bytes;
+    }
+
+    public byte[] getBytes() {
+      return bytes;
+    }
+
+    public WatchedEvent getWatchedEvent() {
+      return event;
+    }
+
+  }
+
+  
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Mar 14 08:01:18 2014
@@ -81,6 +81,13 @@ public class Overseer {
     //Internal queue where overseer stores events that have not yet been published into cloudstate
     //If Overseer dies while extracting the main queue a new overseer will start from this queue 
     private final DistributedQueue workQueue;
+    // Internal map which holds the information about running tasks.
+    private final DistributedMap runningMap;
+    // Internal map which holds the information about successfully completed tasks.
+    private final DistributedMap completedMap;
+    // Internal map which holds the information about failed tasks.
+    private final DistributedMap failureMap;
+
     private Map clusterProps;
     private boolean isClosed = false;
 
@@ -88,6 +95,9 @@ public class Overseer {
       this.zkClient = reader.getZkClient();
       this.stateUpdateQueue = getInQueue(zkClient);
       this.workQueue = getInternalQueue(zkClient);
+      this.failureMap = getFailureMap(zkClient);
+      this.runningMap = getRunningMap(zkClient);
+      this.completedMap = getCompletedMap(zkClient);
       this.myId = myId;
       this.reader = reader;
       clusterProps = reader.getClusterProps();
@@ -135,7 +145,7 @@ public class Overseer {
                   }
                   zkClient.setData(ZkStateReader.CLUSTER_STATE,
                       ZkStateReader.toJSON(clusterState), true);
-                  
+
                   workQueue.poll(); // poll-ing removes the element we got by peek-ing
                 }
                 else {
@@ -1135,6 +1145,24 @@ public class Overseer {
     createOverseerNode(zkClient);
     return new DistributedQueue(zkClient, "/overseer/queue-work", null);
   }
+
+  /* Internal map for failed tasks, not to be used outside of the Overseer */
+  static DistributedMap getRunningMap(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedMap(zkClient, "/overseer/collection-map-running", null);
+  }
+
+  /* Internal map for successfully completed tasks, not to be used outside of the Overseer */
+  static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedMap(zkClient, "/overseer/collection-map-completed", null);
+  }
+
+  /* Internal map for failed tasks, not to be used outside of the Overseer */
+  static DistributedMap getFailureMap(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedMap(zkClient, "/overseer/collection-map-failure", null);
+  }
   
   /* Collection creation queue */
   static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Mar 14 08:01:18 2014
@@ -61,10 +61,8 @@ import org.apache.solr.handler.component
 import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.update.SolrIndexSplitter;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +80,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.solr.cloud.Assign.Node;
@@ -118,16 +117,22 @@ public class OverseerCollectionProcessor
 
   public static final String DELETESHARD = "deleteshard";
 
+  public static final String REQUESTSTATUS = "status";
+
   public static final String ROUTER = "router";
 
   public static final String SHARDS_PROP = "shards";
 
+  public static final String ASYNC = "async";
+
   public static final String CREATESHARD = "createshard";
 
   public static final String DELETEREPLICA = "deletereplica";
 
   public static final String MIGRATE = "migrate";
 
+  public static final String REQUESTID = "requestid";
+
   public static final String COLL_CONF = "collection.configName";
 
   public static final String COLL_PROP_PREFIX = "property.";
@@ -149,6 +154,9 @@ public class OverseerCollectionProcessor
       .getLogger(OverseerCollectionProcessor.class);
   
   private DistributedQueue workQueue;
+  private DistributedMap runningMap;
+  private DistributedMap completedMap;
+  private DistributedMap failureMap;
   
   private String myId;
 
@@ -161,15 +169,25 @@ public class OverseerCollectionProcessor
   private boolean isClosed;
   
   public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
-    this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()));
+    this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
+        Overseer.getRunningMap(zkStateReader.getZkClient()),
+        Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
   }
 
-  protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, DistributedQueue workQueue) {
+  protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
+                                        String adminPath,
+                                        DistributedQueue workQueue,
+                                        DistributedMap runningMap,
+                                        DistributedMap completedMap,
+                                        DistributedMap failureMap) {
     this.zkStateReader = zkStateReader;
     this.myId = myId;
     this.shardHandler = shardHandler;
     this.adminPath = adminPath;
     this.workQueue = workQueue;
+    this.runningMap = runningMap;
+    this.completedMap = completedMap;
+    this.failureMap = failureMap;
   }
   
   @Override
@@ -200,11 +218,35 @@ public class OverseerCollectionProcessor
            
            QueueEvent head = workQueue.peek(true);
            final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+
+           final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
+
+           try {
+             if(message.containsKey(ASYNC) && message.get(ASYNC) != null && !runningMap.contains(message.getStr(ASYNC)))
+              runningMap.put(asyncId, null);
+           } catch (KeeperException.NodeExistsException e) {
+             // Just catch and do nothing. The runningMap.contains(..) check ensures that this is the only
+             // entry point into the runningMap.
+             // NOTE: Make sure to handle it as soon as OCP gets distributed/multi-threaded.
+           }
+
            log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
            final String operation = message.getStr(QUEUE_OPERATION);
            SolrResponse response = processMessage(message, operation);
+
            head.setBytes(SolrResponse.serializable(response));
+           if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
+             if(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
+               failureMap.put(asyncId, null);
+             } else {
+               completedMap.put(asyncId, null);
+             }
+           }
+           if(asyncId != null)
+            runningMap.remove(asyncId);
+
            workQueue.remove(head);
+
           log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
         } catch (KeeperException e) {
           if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -412,9 +454,9 @@ public class OverseerCollectionProcessor
         processRoleCommand(message, operation);
       } else if (ADDREPLICA.isEqual(operation))  {
         addReplica(zkStateReader.getClusterState(), message, results);
-      }
-
-      else {
+      } else if (REQUESTSTATUS.equals(operation)) {
+        requestStatus(message, results);
+      } else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
       }
@@ -764,13 +806,7 @@ public class OverseerCollectionProcessor
 
     }
 
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        processResponse(results, srsp);
-      }
-    } while (srsp != null);
+    processResponses(results);
 
     log.info("Finished create command on all shards for collection: "
         + collectionName);
@@ -931,6 +967,9 @@ public class OverseerCollectionProcessor
       // the only side effect of this is that the sub shard may end up having more replicas than we want
       collectShardResponses(results, false, null);
 
+      String asyncId = message.getStr(ASYNC);
+      HashMap<String, String> requestMap = new HashMap<String, String>();
+
       for (int i=0; i<subRanges.size(); i++)  {
         String subSlice = subSlices.get(i);
         String subShardName = subShardNames.get(i);
@@ -957,12 +996,15 @@ public class OverseerCollectionProcessor
         params.set(CoreAdminParams.NAME, subShardName);
         params.set(CoreAdminParams.COLLECTION, collectionName);
         params.set(CoreAdminParams.SHARD, subSlice);
+        setupAsyncRequest(asyncId, requestMap, params, nodeName);
         addPropertyParams(message, params);
         sendShardRequest(nodeName, params);
       }
 
       collectShardResponses(results, true,
-          "SPLTSHARD failed to create subshard leaders");
+          "SPLITSHARD failed to create subshard leaders");
+
+      completeAsyncRequest(asyncId, requestMap, results);
 
       for (String subShardName : subShardNames) {
         // wait for parent leader to acknowledge the sub-shard core
@@ -975,12 +1017,18 @@ public class OverseerCollectionProcessor
         cmd.setState(ZkStateReader.ACTIVE);
         cmd.setCheckLive(true);
         cmd.setOnlyIfLeader(true);
-        sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+
+        ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+        setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+        sendShardRequest(nodeName, p);
       }
 
       collectShardResponses(results, true,
-          "SPLTSHARD timed out waiting for subshard leaders to come up");
-      
+          "SPLITSHARD timed out waiting for subshard leaders to come up");
+
+      completeAsyncRequest(asyncId, requestMap, results);
+
       log.info("Successfully created all sub-shards for collection "
           + collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
 
@@ -996,9 +1044,12 @@ public class OverseerCollectionProcessor
         params.add(CoreAdminParams.TARGET_CORE, subShardName);
       }
       params.set(CoreAdminParams.RANGES, rangesStr);
+      setupAsyncRequest(asyncId, requestMap, params, parentShardLeader.getNodeName());
 
       sendShardRequest(parentShardLeader.getNodeName(), params);
+
       collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
+      completeAsyncRequest(asyncId, requestMap, results);
 
       log.info("Index on shard: " + nodeName + " split into two successfully");
 
@@ -1012,12 +1063,16 @@ public class OverseerCollectionProcessor
         params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
         params.set(CoreAdminParams.NAME, subShardName);
 
+        setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
         sendShardRequest(nodeName, params);
       }
 
       collectShardResponses(results, true,
           "SPLITSHARD failed while asking sub shard leaders to apply buffered updates");
 
+      completeAsyncRequest(asyncId, requestMap, results);
+
       log.info("Successfully applied buffered updates on : " + subShardNames);
 
       // Replica creation for the new Slices
@@ -1067,6 +1122,12 @@ public class OverseerCollectionProcessor
           params.set(CoreAdminParams.NAME, shardName);
           params.set(CoreAdminParams.COLLECTION, collectionName);
           params.set(CoreAdminParams.SHARD, sliceName);
+          if(asyncId != null) {
+            String requestId = asyncId + Math.abs(System.nanoTime());
+            params.set(ASYNC, requestId);
+            requestMap.put(subShardNodeName, requestId);
+          }
+
           addPropertyParams(message, params);
           // TODO:  Figure the config used by the parent shard and use it.
           //params.set("collection.configName", configName);
@@ -1086,12 +1147,19 @@ public class OverseerCollectionProcessor
           cmd.setState(ZkStateReader.RECOVERING);
           cmd.setCheckLive(true);
           cmd.setOnlyIfLeader(true);
-          sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+          ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+
+          setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+          sendShardRequest(nodeName, p);
+
         }
       }
 
       collectShardResponses(results, true,
-          "SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
+          "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up");
+
+      completeAsyncRequest(asyncId, requestMap, results);
 
       log.info("Successfully created all replica shards for all sub-slices " + subSlices);
 
@@ -1211,7 +1279,40 @@ public class OverseerCollectionProcessor
     } while (srsp != null);
   }
 
-  
+  private void requestStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+    log.info("Request status invoked");
+    String requestId = message.getStr(REQUESTID);
+
+    // Special taskId (-1), clears up the request state maps.
+    if(requestId.equals("-1")) {
+      completedMap.clear();
+      failureMap.clear();
+      return;
+    }
+
+    if(completedMap.contains(requestId)) {
+      SimpleOrderedMap success = new SimpleOrderedMap();
+      success.add("state", "completed");
+      success.add("msg", "found " + requestId + " in completed tasks");
+      results.add("status", success);
+    } else if (runningMap.contains(requestId)) {
+      SimpleOrderedMap success = new SimpleOrderedMap();
+      success.add("state", "running");
+      success.add("msg", "found " + requestId + " in submitted tasks");
+      results.add("status", success);
+    } else if (failureMap.contains(requestId)) {
+      SimpleOrderedMap success = new SimpleOrderedMap();
+      success.add("state", "failed");
+      success.add("msg", "found " + requestId + " in failed tasks");
+      results.add("status", success);
+    } else {
+      SimpleOrderedMap failure = new SimpleOrderedMap();
+      failure.add("state", "notfound");
+      failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
+      results.add("status", failure);
+    }
+  }
+
   private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
     log.info("Delete shard invoked");
     String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
@@ -1222,7 +1323,7 @@ public class OverseerCollectionProcessor
     if (slice == null) {
       if(clusterState.hasCollection(collection)) {
         throw new SolrException(ErrorCode.BAD_REQUEST,
-            "No shard with the specified name exists: " + slice);
+            "No shard with the specified name exists: " + slice.getName());
       } else {
         throw new SolrException(ErrorCode.BAD_REQUEST,
             "No collection with the specified name exists: " + collection);
@@ -1242,13 +1343,7 @@ public class OverseerCollectionProcessor
       params.set(CoreAdminParams.DELETE_INDEX, "true");
       sliceCmd(clusterState, params, null, slice);
 
-      ShardResponse srsp;
-      do {
-        srsp = shardHandler.takeCompletedOrError();
-        if (srsp != null) {
-          processResponse(results, srsp);
-        }
-      } while (srsp != null);
+      processResponses(results);
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
           Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection,
@@ -1314,21 +1409,29 @@ public class OverseerCollectionProcessor
           "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
     }
 
+    String asyncId = null;
+    if(message.containsKey(ASYNC) && message.get(ASYNC) != null)
+      asyncId = message.getStr(ASYNC);
+
     for (Slice sourceSlice : sourceSlices) {
       for (Slice targetSlice : targetSlices) {
         log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
-        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results);
+        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results, asyncId);
       }
     }
   }
 
-  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, DocCollection targetCollection, Slice targetSlice, String splitKey, int timeout, NamedList results) throws KeeperException, InterruptedException {
+  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+                          DocCollection targetCollection, Slice targetSlice,
+                          String splitKey, int timeout,
+                          NamedList results, String asyncId) throws KeeperException, InterruptedException {
     String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
     if (clusterState.hasCollection(tempSourceCollectionName)) {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
       Map<String, Object> props = ZkNodeProps.makeMap(
           QUEUE_OPERATION, DELETECOLLECTION,
           "name", tempSourceCollectionName);
+
       try {
         deleteCollection(new ZkNodeProps(props), results);
       } catch (Exception e) {
@@ -1350,15 +1453,23 @@ public class OverseerCollectionProcessor
     log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
 
     Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+    // For tracking async calls.
+    HashMap<String, String> requestMap = new HashMap<String, String>();
 
     log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
         + targetLeader.getStr("core") + " to buffer updates");
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+    String nodeName = targetLeader.getNodeName();
+    setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
     sendShardRequest(targetLeader.getNodeName(), params);
+
     collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates");
 
+    completeAsyncRequest(asyncId, requestMap, results);
+
     ZkNodeProps m = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
         COLLECTION_PROP, sourceCollection.getName(),
@@ -1405,6 +1516,11 @@ public class OverseerCollectionProcessor
         NUM_SLICES, 1,
         COLL_CONF, configName,
         CREATE_NODE_SET, sourceLeader.getNodeName());
+    if(asyncId != null) {
+      String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+      props.put(ASYNC, internalAsyncId);
+    }
+
     log.info("Creating temporary collection: " + props);
     createCollection(clusterState, new ZkNodeProps(props), results);
     // refresh cluster state
@@ -1437,8 +1553,13 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.RANGES, splitRange.toString());
     params.set("split.key", splitKey);
 
-    sendShardRequest(sourceLeader.getNodeName(), params);
+    String tempNodeName = sourceLeader.getNodeName();
+
+    setupAsyncRequest(asyncId, requestMap, params, tempNodeName);
+
+    sendShardRequest(tempNodeName, params);
     collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command");
+    completeAsyncRequest(asyncId, requestMap, results);
 
     log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
         tempSourceCollectionName, targetLeader.getNodeName());
@@ -1448,7 +1569,13 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.NAME, tempCollectionReplica2);
     params.set(CoreAdminParams.COLLECTION, tempSourceCollectionName);
     params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
+
+    setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
     sendShardRequest(targetLeader.getNodeName(), params);
+    collectShardResponses(results, true,
+        "MIGRATE failed to create replica of temporary collection in target leader node.");
+
+    completeAsyncRequest(asyncId, requestMap, results);
 
     coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
         zkStateReader.getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
@@ -1458,14 +1585,19 @@ public class OverseerCollectionProcessor
     cmd.setCoreName(tempSourceLeader.getStr("core"));
     cmd.setNodeName(targetLeader.getNodeName());
     cmd.setCoreNodeName(coreNodeName);
-    cmd.setState(ZkStateReader.ACTIVE); // todo introduce asynchronous actions
+    cmd.setState(ZkStateReader.ACTIVE);
     cmd.setCheckLive(true);
     cmd.setOnlyIfLeader(true);
-    sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
+    params = new ModifiableSolrParams(cmd.getParams());
+
+    setupAsyncRequest(asyncId, requestMap, params, tempSourceLeader.getNodeName());
+
+    sendShardRequest(tempSourceLeader.getNodeName(), params);
 
     collectShardResponses(results, true,
         "MIGRATE failed to create temp collection replica or timed out waiting for them to come up");
 
+    completeAsyncRequest(asyncId, requestMap, results);
     log.info("Successfully created replica of temp source collection on target leader node");
 
     log.info("Requesting merge of temp source collection replica to target leader");
@@ -1473,18 +1605,27 @@ public class OverseerCollectionProcessor
     params.set(CoreAdminParams.ACTION, CoreAdminAction.MERGEINDEXES.toString());
     params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
     params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+    setupAsyncRequest(asyncId, requestMap, params, sourceLeader.getNodeName());
+
     sendShardRequest(targetLeader.getNodeName(), params);
     collectShardResponses(results, true,
         "MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName());
 
+    completeAsyncRequest(asyncId, requestMap, results);
+
     log.info("Asking target leader to apply buffered updates");
     params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
     params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+    setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
+
     sendShardRequest(targetLeader.getNodeName(), params);
     collectShardResponses(results, true,
         "MIGRATE failed to request node to apply buffered updates");
 
+    completeAsyncRequest(asyncId, requestMap, results);
+
     try {
       log.info("Deleting temporary collection: " + tempSourceCollectionName);
       props = ZkNodeProps.makeMap(
@@ -1497,6 +1638,21 @@ public class OverseerCollectionProcessor
     }
   }
 
+  private void completeAsyncRequest(String asyncId, HashMap<String, String> requestMap, NamedList results) {
+    if(asyncId != null) {
+      waitForAsyncCallsToComplete(requestMap, results);
+      requestMap.clear();
+    }
+  }
+
+  private void setupAsyncRequest(String asyncId, HashMap<String, String> requestMap, ModifiableSolrParams params, String nodeName) {
+    if(asyncId != null) {
+      String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
+      params.set(ASYNC, coreAdminAsyncId);
+      requestMap.put(nodeName, coreAdminAsyncId);
+    }
+  }
+
   private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
     if (a == null || b == null || !a.overlaps(b)) {
       return null;
@@ -1542,6 +1698,11 @@ public class OverseerCollectionProcessor
       // if it does not, find best nodes to create more cores
       
       int repFactor = message.getInt( REPLICATION_FACTOR, 1);
+
+      String async = null;
+      if (message.containsKey("async"))
+        async = message.getStr("async");
+
       Integer numSlices = message.getInt(NUM_SLICES, null);
       String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
       List<String> shardNames = new ArrayList<>();
@@ -1627,6 +1788,9 @@ public class OverseerCollectionProcessor
       if (!created)
         throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
 
+      // For tracking async calls.
+      HashMap<String, String> requestMap = new HashMap<String, String>();
+
       log.info("Creating SolrCores for new collection, shardNames {} , replicationFactor : {}", shardNames, repFactor);
       Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
       for (int i = 1; i <= shardNames.size(); i++) {
@@ -1662,6 +1826,11 @@ public class OverseerCollectionProcessor
           params.set(CoreAdminParams.COLLECTION, collectionName);
           params.set(CoreAdminParams.SHARD, sliceName);
           params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+          String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+          if (replica.startsWith("http://")) replica = replica.substring(7);
+
+          setupAsyncRequest(async, requestMap, params, nodeName);
+
           addPropertyParams(message, params);
 
           ShardRequest sreq = new ShardRequest();
@@ -1689,13 +1858,9 @@ public class OverseerCollectionProcessor
         }
       }
 
-      ShardResponse srsp;
-      do {
-        srsp = shardHandler.takeCompletedOrError();
-        if (srsp != null) {
-          processResponse(results, srsp);
-        }
-      } while (srsp != null);
+      processResponses(results);
+
+      completeAsyncRequest(async, requestMap, results);
 
       log.info("Finished create command on all shards for collection: "
           + collectionName);
@@ -1829,6 +1994,18 @@ public class OverseerCollectionProcessor
       "ADDREPLICA failed to create replica");
   }
 
+
+  private void processResponses(NamedList results) {
+    ShardResponse srsp;
+    do {
+      srsp = shardHandler.takeCompletedOrError();
+      if (srsp != null) {
+        processResponse(results, srsp);
+      }
+    } while (srsp != null);
+  }
+
+
   private String createConfNode(String coll, ZkNodeProps message, boolean isLegacyCloud) throws KeeperException, InterruptedException {
     String configName = message.getStr(OverseerCollectionProcessor.COLL_CONF);
     if(configName == null){
@@ -1873,14 +2050,8 @@ public class OverseerCollectionProcessor
       Slice slice = entry.getValue();
       sliceCmd(clusterState, params, stateMatcher, slice);
     }
-    
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        processResponse(results, srsp);
-      }
-    } while (srsp != null);
+
+    processResponses(results);
 
   }
 
@@ -1950,4 +2121,71 @@ public class OverseerCollectionProcessor
     return isClosed;
   }
 
+  private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+    for(String k:requestMap.keySet()) {
+      log.debug("I am Waiting for : " + k + "/" + requestMap.get(k));
+      results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
+    }
+  }
+
+  private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
+    params.set(CoreAdminParams.REQUESTID, requestId);
+    int counter = 0;
+    ShardRequest sreq;
+    do {
+      sreq = new ShardRequest();
+      params.set("qt", adminPath);
+      sreq.purpose = 1;
+      String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+      if (replica.startsWith("http://")) replica = replica.substring(7);
+      sreq.shards = new String[] {replica};
+      sreq.actualShards = sreq.shards;
+      sreq.params = params;
+
+      shardHandler.submit(sreq, replica, sreq.params);
+
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          NamedList results = new NamedList();
+          processResponse(results, srsp);
+          String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
+          if(r.equals("running")) {
+            log.debug("The task is still RUNNING, continuing to wait.");
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+            continue;
+
+          } else if(r.equals("completed")) {
+            log.debug("The task is COMPLETED, returning");
+            return srsp.getSolrResponse().getResponse();
+          } else if (r.equals("failed")) {
+            // TODO: Improve this. Get more information.
+            log.debug("The task is FAILED, returning");
+            return srsp.getSolrResponse().getResponse();
+          } else if (r.equals("notfound")) {
+            log.debug("The task is notfound, retry");
+            if(counter++ < 5) {
+              try {
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+              }
+              break;
+            }
+            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request: " + srsp.getSolrResponse().getResponse().get("STATUS") +
+            "retried " + counter + "times");
+          } else {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
+          }
+        }
+      } while (srsp != null);
+    } while(true);
+  }
+
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Mar 14 08:01:18 2014
@@ -92,6 +92,10 @@ public final class ZkController {
   
   private final DistributedQueue overseerJobQueue;
   private final DistributedQueue overseerCollectionQueue;
+
+  private final DistributedMap overseerRunningMap;
+  private final DistributedMap overseerCompletedMap;
+  private final DistributedMap overseerFailureMap;
   
   public static final String CONFIGS_ZKNODE = "/configs";
 
@@ -279,6 +283,9 @@ public final class ZkController {
     
     this.overseerJobQueue = Overseer.getInQueue(zkClient);
     this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
+    this.overseerRunningMap = Overseer.getRunningMap(zkClient);
+    this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
+    this.overseerFailureMap = Overseer.getFailureMap(zkClient);
     cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
     leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient);
@@ -1583,6 +1590,18 @@ public final class ZkController {
   public DistributedQueue getOverseerCollectionQueue() {
     return overseerCollectionQueue;
   }
+
+  public DistributedMap getOverseerRunningMap() {
+    return overseerRunningMap;
+  }
+
+  public DistributedMap getOverseerCompletedMap() {
+    return overseerCompletedMap;
+  }
+
+  public DistributedMap getOverseerFailureMap() {
+    return overseerFailureMap;
+  }
   
   public int getClientTimeout() {
     return clientTimeout;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Mar 14 08:01:18 2014
@@ -368,6 +368,13 @@ public class CoreContainer {
     }
 
     try {
+      coreAdminHandler.shutdown();
+    } catch (Exception e) {
+      log.warn("Error shutting down CoreAdminHandler. Continuing to shutdown CoreContainer.");
+      e.printStackTrace();
+    }
+
+    try {
       // First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
       synchronized (solrCores.getModifyLock()) {
         solrCores.getModifyLock().notifyAll(); // wake up anyone waiting

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Mar 14 08:01:18 2014
@@ -18,6 +18,7 @@ package org.apache.solr.handler.admin;
  */
 
 import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
@@ -25,6 +26,7 @@ import static org.apache.solr.cloud.Over
 import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
 import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
@@ -51,6 +53,7 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
@@ -199,6 +202,10 @@ public class CollectionsHandler extends 
         this.handleAddReplica(req, rsp);
         break;
       }
+      case REQUESTSTATUS: {
+        this.handleRequestStatus(req, rsp);
+        break;
+      }
       default: {
           throw new RuntimeException("Unknown action: " + action);
       }
@@ -236,6 +243,16 @@ public class CollectionsHandler extends 
 
   public static long DEFAULT_ZK_TIMEOUT = 180*1000;
 
+  private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+    log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
+    req.getParams().required().check(REQUESTID);
+    Map<String, Object> props = new HashMap<String, Object>();
+    props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.REQUESTSTATUS);
+    props.put(REQUESTID, req.getParams().get(REQUESTID));
+    ZkNodeProps m = new ZkNodeProps(props);
+    handleResponse(OverseerCollectionProcessor.REQUESTSTATUS, m, rsp);
+  }
+
   private void handleResponse(String operation, ZkNodeProps m,
                               SolrQueryResponse rsp) throws KeeperException, InterruptedException {
     handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@@ -244,6 +261,35 @@ public class CollectionsHandler extends 
   private void handleResponse(String operation, ZkNodeProps m,
       SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
     long time = System.nanoTime();
+
+     if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
+ 
+       String asyncId = m.getStr(ASYNC);
+ 
+       if(asyncId.equals("-1")) {
+         throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
+       }
+ 
+       NamedList<String> r = new NamedList<>();
+ 
+       if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
+           coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
+           coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
+         r.add("error", "Task with the same requestid already exists.");
+ 
+       } else {
+         coreContainer.getZkController().getOverseerCollectionQueue()
+             .offer(ZkStateReader.toJSON(m));
+ 
+       }
+       r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
+       SolrResponse response = new OverseerSolrResponse(r);
+ 
+       rsp.getValues().addAll(response.getResponse());
+ 
+       return;
+     }
+
     QueueEvent event = coreContainer.getZkController()
         .getOverseerCollectionQueue()
         .offer(ZkStateReader.toJSON(m), timeout);
@@ -368,6 +414,7 @@ public class CollectionsHandler extends 
          MAX_SHARDS_PER_NODE,
         CREATE_NODE_SET ,
         SHARDS_PROP,
+        ASYNC,
         "router.");
 
     copyPropertiesIfNotNull(req.getParams(), props);
@@ -380,7 +427,7 @@ public class CollectionsHandler extends 
     log.info("Remove replica: " + req.getParamString());
     req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
     Map<String, Object> map = makeMap(QUEUE_OPERATION, DELETEREPLICA);
-    copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica");
+    copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC);
     ZkNodeProps m = new ZkNodeProps(map);
     handleResponse(DELETEREPLICA, m, rsp);
   }
@@ -394,7 +441,7 @@ public class CollectionsHandler extends 
       throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
 
     Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD);
-    copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET);
+    copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET, ASYNC);
     copyPropertiesIfNotNull(req.getParams(), map);
     ZkNodeProps m = new ZkNodeProps(map);
     handleResponse(CREATESHARD, m, rsp);
@@ -485,6 +532,10 @@ public class CollectionsHandler extends 
     if (rangesStr != null)  {
       props.put(CoreAdminParams.RANGES, rangesStr);
     }
+
+    if (req.getParams().get(ASYNC) != null)
+      props.put(ASYNC, req.getParams().get(ASYNC));
+
     copyPropertiesIfNotNull(req.getParams(), props);
 
     ZkNodeProps m = new ZkNodeProps(props);
@@ -497,7 +548,7 @@ public class CollectionsHandler extends 
     req.getParams().required().check("collection", "split.key", "target.collection");
     Map<String,Object> props = new HashMap<>();
     props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.MIGRATE);
-    copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout");
+    copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC);
     ZkNodeProps m = new ZkNodeProps(props);
     handleResponse(OverseerCollectionProcessor.MIGRATE, m, rsp, DEFAULT_ZK_TIMEOUT * 20);
   }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Fri Mar 14 08:01:18 2014
@@ -17,21 +17,8 @@
 
 package org.apache.solr.handler.admin;
 
-import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.lucene.index.DirectoryReader;
@@ -55,6 +42,7 @@ import org.apache.solr.common.params.Cor
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.core.CloseHook;
@@ -75,14 +63,31 @@ import org.apache.solr.update.SplitIndex
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.NumberUtils;
 import org.apache.solr.util.RefCounted;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 
 /**
  *
@@ -91,6 +96,24 @@ import com.google.common.collect.Lists;
 public class CoreAdminHandler extends RequestHandlerBase {
   protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
   protected final CoreContainer coreContainer;
+  protected static HashMap<String, Map<String, TaskObject>> requestStatusMap =
+      new HashMap<String,Map<String, TaskObject>>();
+
+  protected ExecutorService parallelExecutor = null;
+
+  protected static int MAX_TRACKED_REQUESTS = 100;
+  public static String RUNNING = "running";
+  public static String COMPLETED = "completed";
+  public static String FAILED = "failed";
+  public static String RESPONSE = "Response";
+  public static String RESPONSE_STATUS = "STATUS";
+  public static String RESPONSE_MESSAGE = "msg";
+
+  static {
+    requestStatusMap.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+    requestStatusMap.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+    requestStatusMap.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+  }
 
   public CoreAdminHandler() {
     super();
@@ -136,6 +159,18 @@ public class CoreAdminHandler extends Re
               "Core container instance missing");
     }
     //boolean doPersist = false;
+    String taskId = req.getParams().get("async");
+    TaskObject taskObject = new TaskObject(taskId);
+
+    if(taskId != null) {
+      // Put the tasks into the maps for tracking
+      if (getMap(RUNNING).containsKey(taskId) || getMap(COMPLETED).containsKey(taskId) || getMap(FAILED).containsKey(taskId)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Duplicate request with the same requestid found.");
+      }
+
+      addTask(RUNNING, taskObject);
+    }
 
     // Pick the action
     SolrParams params = req.getParams();
@@ -147,6 +182,19 @@ public class CoreAdminHandler extends Re
         this.handleCustomAction(req, rsp);
       }
     }
+
+    if (taskId == null) {
+      handleRequestInternal(req, rsp, action);
+    } else {
+      ParallelCoreAdminHandlerThread parallelHandlerThread = new ParallelCoreAdminHandlerThread(req, rsp, action, taskObject);
+      if(parallelExecutor == null || parallelExecutor.isShutdown())
+        parallelExecutor = Executors.newFixedThreadPool(50,
+                  new DefaultSolrThreadFactory("parallelCoreAdminExecutor"));
+        parallelExecutor.execute(parallelHandlerThread);
+    }
+  }
+
+  protected void handleRequestInternal(SolrQueryRequest req, SolrQueryResponse rsp, CoreAdminAction action) throws Exception {
     if (action != null) {
       switch (action) {
         case CREATE: {
@@ -199,17 +247,17 @@ public class CoreAdminHandler extends Re
           this.handleWaitForStateAction(req, rsp);
           break;
         }
-        
+
         case REQUESTRECOVERY: {
           this.handleRequestRecoveryAction(req, rsp);
           break;
         }
-        
+
         case REQUESTSYNCSHARD: {
           this.handleRequestSyncAction(req, rsp);
           break;
         }
-        
+
         // todo : Can this be done by the regular RecoveryStrategy route?
         case REQUESTAPPLYUPDATES: {
           this.handleRequestApplyUpdatesAction(req, rsp);
@@ -219,6 +267,10 @@ public class CoreAdminHandler extends Re
           this.handleRequestBufferUpdatesAction(req, rsp);
           break;
         }
+        case REQUESTSTATUS: {
+          this.handleRequestActionStatus(req, rsp);
+          break;
+        }
         case OVERSEEROP:{
           ZkController zkController = coreContainer.getZkController();
           if(zkController != null){
@@ -240,7 +292,7 @@ public class CoreAdminHandler extends Re
     rsp.setHttpCaching(false);
   }
 
-  
+
   /**
    * Handle the core admin SPLIT action.
    */
@@ -755,6 +807,28 @@ public class CoreAdminHandler extends Re
   }
 
   /**
+   * Handle "REQUESTSTATUS" action
+   */
+  protected void handleRequestActionStatus(SolrQueryRequest req, SolrQueryResponse rsp) {
+    SolrParams params = req.getParams();
+    String requestId = params.get(CoreAdminParams.REQUESTID);
+    log.info("Checking request status for : " + requestId);
+
+    if (mapContainsTask(RUNNING, requestId)) {
+      rsp.add(RESPONSE_STATUS, RUNNING);
+    } else if(mapContainsTask(COMPLETED, requestId)) {
+      rsp.add(RESPONSE_STATUS, COMPLETED);
+      rsp.add(RESPONSE, getMap(COMPLETED).get(requestId).getRspObject());
+    } else if(mapContainsTask(FAILED, requestId)) {
+      rsp.add(RESPONSE_STATUS, FAILED);
+      rsp.add(RESPONSE, getMap(FAILED).get(requestId).getRspObject());
+    } else {
+      rsp.add(RESPONSE_STATUS, "notfound");
+      rsp.add(RESPONSE_MESSAGE, "No task found in running, completed or failed tasks");
+    }
+  }
+
+  /**
    * Handle "SWAP" action
    */
   protected void handleSwapAction(SolrQueryRequest req, SolrQueryResponse rsp) {
@@ -1172,4 +1246,123 @@ public class CoreAdminHandler extends Re
   public String getSource() {
     return "$URL$";
   }
+
+  /**
+   * Class to implement multi-threaded CoreAdminHandler behaviour.
+   * This accepts all of the context from handleRequestBody.
+   */
+  protected class ParallelCoreAdminHandlerThread implements Runnable {
+    SolrQueryRequest req;
+    SolrQueryResponse rsp;
+    CoreAdminAction action;
+    TaskObject taskObject;
+
+    public ParallelCoreAdminHandlerThread (SolrQueryRequest req, SolrQueryResponse rsp,
+                                           CoreAdminAction action, TaskObject taskObject){
+      this.req = req;
+      this.rsp = rsp;
+      this.action = action;
+      this.taskObject = taskObject;
+    }
+
+    public void run() {
+      boolean exceptionCaught = false;
+      try {
+        handleRequestInternal(req, rsp, action);
+        taskObject.setRspObject(rsp);
+      } catch (Exception e) {
+        exceptionCaught = true;
+        taskObject.setRspObjectFromException(e);
+      } finally {
+        removeTask("running", taskObject.taskId);
+        if(exceptionCaught) {
+          addTask("failed", taskObject, true);
+        } else
+          addTask("completed", taskObject, true);
+      }
+
+    }
+
+  }
+
+  /**
+   * Helper class to manage the tasks to be tracked.
+   * This contains the taskId, request and the response (if available).
+   */
+  private class TaskObject {
+    String taskId;
+    String rspInfo;
+
+    public TaskObject(String taskId) {
+      this.taskId = taskId;
+    }
+
+    public String getRspObject() {
+      return rspInfo;
+    }
+
+    public void setRspObject(SolrQueryResponse rspObject) {
+      this.rspInfo = rspObject.getToLogAsString("TaskId: " + this.taskId + " ");
+    }
+
+    public void setRspObjectFromException(Exception e) {
+      this.rspInfo = e.getMessage();
+    }
+  }
+
+  /**
+   * Helper method to add a task to a tracking map.
+   */
+  protected void addTask(String map, TaskObject o, boolean limit) {
+    if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
+      String key = getMap(map).entrySet().iterator().next().getKey();
+      getMap(map).remove(key);
+    }
+    addTask(map, o);
+  }
+
+
+  protected void addTask(String map, TaskObject o) {
+    synchronized (getMap(map)) {
+      getMap(map).put(o.taskId, o);
+    }
+  }
+
+  /**
+   * Helper method to remove a task from a tracking map.
+   */
+  protected void removeTask(String map, String taskId) {
+    synchronized (getMap(map)) {
+      getMap(map).remove(taskId);
+    }
+  }
+
+  /**
+   * Helper method to check if a map contains a taskObject with the given taskId.
+   */
+  protected boolean mapContainsTask(String map, String taskId) {
+    return getMap(map).containsKey(taskId);
+  }
+
+  /**
+   * Helper method to get a TaskObject given a map and a taskId.
+   */
+  protected TaskObject getTask(String map, String taskId) {
+    return getMap(map).get(taskId);
+  }
+
+  /**
+   * Helper method to get a request status map given the name.
+   */
+  private Map<String, TaskObject> getMap(String map) {
+    return requestStatusMap.get(map);
+  }
+
+  /**
+   * Method to ensure shutting down of the ThreadPool Executor.
+   */
+  public void shutdown() {
+    if (parallelExecutor != null && !parallelExecutor.isShutdown())
+      ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
+  }
 }

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,125 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Before;
+
+import java.io.IOException;
+
+public class AsyncMigrateRouteKeyTest extends MigrateRouteKeyTest {
+
+  public AsyncMigrateRouteKeyTest() {
+    schemaString = "schema15.xml";      // we need a string id
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void doTest() throws Exception {
+    waitForThingsToLevelOut(15);
+
+    multipleShardMigrateTest();
+    printLayout();
+  }
+
+  protected void checkAsyncRequestForCompletion(String asyncId) throws SolrServerException, IOException {
+    ModifiableSolrParams params;
+    String message;
+    params = new ModifiableSolrParams();
+    params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+    params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
+    message = sendStatusRequestWithRetry(params, 10);
+    assertEquals("Task " + asyncId + " not found in completed tasks.",
+        "found " + asyncId + " in completed tasks", message);
+  }
+
+  @Override
+  protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    String asyncId = "20140128";
+    params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.MIGRATE.toString());
+    params.set("collection", sourceCollection);
+    params.set("target.collection", targetCollection);
+    params.set("split.key", splitKey);
+    params.set("forward.timeout", 45);
+    params.set("async", asyncId);
+
+    invoke(params);
+
+    checkAsyncRequestForCompletion(asyncId);
+  }
+
+  /**
+   * Helper method to send a status request with specific retry limit and return
+   * the message/null from the success response.
+   */
+  private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
+      throws SolrServerException, IOException {
+    NamedList status = null;
+    String state = null;
+    String message = null;
+    NamedList r;
+    while (maxCounter-- > 0) {
+      r = sendRequest(params);
+      status = (NamedList) r.get("status");
+      state = (String) status.get("state");
+      message = (String) status.get("msg");
+
+      if (state.equals("completed") || state.equals("failed"))
+        return (String) status.get("msg");
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+
+      }
+
+    }
+    // Return last state?
+    return message;
+  }
+
+  protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
+    SolrRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+
+    String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
+        .getBaseURL();
+    baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
+
+    HttpSolrServer baseServer = null;
+
+    try {
+      baseServer = new HttpSolrServer(baseUrl);
+      baseServer.setConnectionTimeout(15000);
+      return baseServer.request(request);
+    } finally {
+      baseServer.shutdown();
+    }
+  }
+}

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,185 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer.RemoteSolrException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.SolrInfoMBean.Category;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.update.DirectUpdateHandler2;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+
+/**
+ * Tests the Cloud Collections API.
+ */
+@Slow
+public class CollectionsAPIAsyncDistributedZkTest extends AbstractFullDistribZkTestBase {
+
+  private static final boolean DEBUG = false;
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    useJettyDataDir = false;
+
+    System.setProperty("numShards", Integer.toString(sliceCount));
+    System.setProperty("solr.xml.persist", "true");
+  }
+
+  public CollectionsAPIAsyncDistributedZkTest() {
+    fixShardCount = true;
+
+    sliceCount = 2;
+    shardCount = 4;
+  }
+
+  @Override
+  public void doTest() throws Exception {
+
+    testSolrJAPICalls();
+
+    if (DEBUG) {
+      super.printLayout();
+    }
+  }
+
+  private void testSolrJAPICalls() throws Exception {
+    SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
+    CollectionAdminRequest.createCollection("testasynccollectioncreation", 2, "conf1", server, "1001");
+    String state = null;
+
+    state = getRequestStateAfterCompletion("1001", 10, server);
+
+    assertEquals("CreateCollection task did not complete!", "completed", state);
+
+    CollectionAdminRequest.createCollection("testasynccollectioncreation", 2, "conf1", server, "1002");
+
+    state = getRequestStateAfterCompletion("1002", 3, server);
+
+    assertEquals("Recreating a collection with the same name didn't fail, should have.", "failed", state);
+
+    CollectionAdminRequest.splitShard("testasynccollectioncreation", "shard1", server, "1003");
+
+    state = getRequestStateAfterCompletion("1003", 60, server);
+
+    assertEquals("Shard split did not complete. Last recorded state: " + state, "completed", state);
+  }
+
+  private String getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrServer server)
+      throws IOException, SolrServerException {
+    String state = null;
+    while(waitForSeconds-- > 0) {
+      state = getRequestState(requestId, server);
+      if(state.equals("completed") || state.equals("failed"))
+        return state;
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+    return state;
+  }
+
+  private String getRequestState(String requestId, SolrServer server) throws IOException, SolrServerException {
+    CollectionAdminResponse response = CollectionAdminRequest.requestStatus(requestId, server);
+    NamedList innerResponse = (NamedList) response.getResponse().get("status");
+    return (String) innerResponse.get("state");
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    System.clearProperty("numShards");
+    System.clearProperty("zkHost");
+    System.clearProperty("solr.xml.persist");
+    
+    // insurance
+    DirectUpdateHandler2.commitOnClose = true;
+  }
+
+}

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java Fri Mar 14 08:01:18 2014
@@ -114,7 +114,7 @@ public class MigrateRouteKeyTest extends
     return ruleRemoved;
   }
 
-  private void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
+  protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
     params.set("collection", sourceCollection);
@@ -125,7 +125,7 @@ public class MigrateRouteKeyTest extends
     invoke(params);
   }
 
-  private void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
+  protected void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
     SolrRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
 
@@ -161,7 +161,7 @@ public class MigrateRouteKeyTest extends
     waitForRecoveriesToFinish(targetCollection, false);
   }
 
-  private void multipleShardMigrateTest() throws Exception  {
+  protected void multipleShardMigrateTest() throws Exception  {
     del("*:*");
     commit();
     assertTrue(cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() == 0);

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Fri Mar 14 08:01:18 2014
@@ -71,6 +71,9 @@ public class OverseerCollectionProcessor
   private static final String CONFIG_NAME = "myconfig";
   
   private static DistributedQueue workQueueMock;
+  private static DistributedMap runningMapMock;
+  private static DistributedMap completedMapMock;
+  private static DistributedMap failureMapMock;
   private static ShardHandler shardHandlerMock;
   private static ZkStateReader zkStateReaderMock;
   private static ClusterState clusterStateMock;
@@ -90,8 +93,10 @@ public class OverseerCollectionProcessor
     
     public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
         String myId, ShardHandler shardHandler, String adminPath,
-        DistributedQueue workQueue) {
-      super(zkStateReader, myId, shardHandler, adminPath, workQueue);
+        DistributedQueue workQueue, DistributedMap runningMap,
+        DistributedMap completedMap,
+        DistributedMap failureMap) {
+      super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap);
     }
     
     @Override
@@ -111,6 +116,9 @@ public class OverseerCollectionProcessor
   @BeforeClass
   public static void setUpOnce() throws Exception {
     workQueueMock = createMock(DistributedQueue.class);
+    runningMapMock = createMock(DistributedMap.class);
+    completedMapMock = createMock(DistributedMap.class);
+    failureMapMock = createMock(DistributedMap.class);
     shardHandlerMock = createMock(ShardHandler.class);
     zkStateReaderMock = createMock(ZkStateReader.class);
     clusterStateMock = createMock(ClusterState.class);
@@ -120,6 +128,9 @@ public class OverseerCollectionProcessor
   @AfterClass
   public static void tearDownOnce() {
     workQueueMock = null;
+    runningMapMock = null;
+    completedMapMock = null;
+    failureMapMock = null;
     shardHandlerMock = null;
     zkStateReaderMock = null;
     clusterStateMock = null;
@@ -131,13 +142,16 @@ public class OverseerCollectionProcessor
     super.setUp();
     queue.clear();
     reset(workQueueMock);
-    reset(workQueueMock);
+    reset(runningMapMock);
+    reset(completedMapMock);
+    reset(failureMapMock);
     reset(shardHandlerMock);
     reset(zkStateReaderMock);
     reset(clusterStateMock);
     reset(solrZkClientMock);
     underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
-        "1234", shardHandlerMock, ADMIN_PATH, workQueueMock);
+        "1234", shardHandlerMock, ADMIN_PATH, workQueueMock, runningMapMock,
+        completedMapMock, failureMapMock);
     zkMap.clear();
     collectionsSet.clear();
   }

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,224 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Before;
+
+import java.io.IOException;
+
+public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
+
+  public TestRequestStatusCollectionAPI() {
+    schemaString = "schema15.xml";      // we need a string id
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+
+  }
+
+  public void doTest() {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+
+    params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
+    params.set("name", "collection2");
+    params.set("numShards", 2);
+    params.set("replicationFactor", 1);
+    params.set("maxShardsPerNode", 100);
+    params.set("collection.configName", "conf1");
+    params.set("async", "1000");
+    try {
+      sendRequest(params);
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // Check for the request to be completed.
+    int maxCounter = 10;
+
+    NamedList r = null;
+    NamedList status = null;
+    String message = null;
+
+    params = new ModifiableSolrParams();
+
+    params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+    params.set(OverseerCollectionProcessor.REQUESTID, "1000");
+
+    try {
+      message = sendStatusRequestWithRetry(params, 10);
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    assertEquals("Task 1000 not found in completed tasks.", "found 1000 in completed tasks", message);
+
+    // Check for a random (hopefully non-existent request id
+    params = new ModifiableSolrParams();
+    params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+    params.set(OverseerCollectionProcessor.REQUESTID, "9999999");
+    try {
+      r = sendRequest(params);
+      status = (NamedList) r.get("status");
+      message = (String) status.get("msg");
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    assertEquals("Task 9999999 found in tasks queue.", "Did not find taskid [9999999] in any tasks queue", message);
+
+    params = new ModifiableSolrParams();
+    params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.SPLITSHARD.toString());
+    params.set("collection", "collection2");
+    params.set("shard", "shard1");
+    params.set("async", "1001");
+    try {
+      sendRequest(params);
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // Check for the request to be completed.
+      params = new ModifiableSolrParams();
+      params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+      params.set(OverseerCollectionProcessor.REQUESTID, "1001");
+    try {
+      message = sendStatusRequestWithRetry(params, maxCounter);
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    assertEquals("Task 1001 not found in completed tasks.", "found 1001 in completed tasks", message);
+
+      params = new ModifiableSolrParams();
+      params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
+      params.set("name", "collection2");
+      params.set("numShards", 2);
+      params.set("replicationFactor", 1);
+      params.set("maxShardsPerNode", 100);
+      params.set("collection.configName", "conf1");
+      params.set("async", "1002");
+    try {
+      sendRequest(params);
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+
+    params = new ModifiableSolrParams();
+
+      params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+      params.set(OverseerCollectionProcessor.REQUESTID, "1002");
+
+    try {
+      message = sendStatusRequestWithRetry(params, 10);
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+
+    assertEquals("Task 1002 not found in completed tasks.", "found 1002 in failed tasks", message);
+
+      params = new ModifiableSolrParams();
+      params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
+      params.set("name", "collection3");
+      params.set("numShards", 1);
+      params.set("replicationFactor", 1);
+      params.set("maxShardsPerNode", 100);
+      params.set("collection.configName", "conf1");
+      params.set("async", "1002");
+    try {
+      r = sendRequest(params);
+    } catch (SolrServerException e) {
+      e.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    assertEquals("Did not error out on duplicate requests (same request id)",
+          "Task with the same requestid already exists.", r.get("error"));
+
+  }
+
+
+  /**
+   * Helper method to send a status request with specific retry limit and return
+   * the message/null from the success response.
+   */
+  private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
+      throws SolrServerException, IOException{
+    NamedList status = null;
+    String state = null;
+    String message = null;
+    NamedList r;
+    while(maxCounter-- > 0) {
+      r = sendRequest(params);
+      status = (NamedList) r.get("status");
+      state = (String) status.get("state");
+      message = (String) status.get("msg");
+
+      if(state.equals("completed") || state.equals("failed"))
+        return (String) status.get("msg");
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+
+      }
+
+    }
+    // Return last state?
+    return message;
+  }
+
+  protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
+    SolrRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+
+    String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
+        .getBaseURL();
+    baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
+
+    HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
+    baseServer.setConnectionTimeout(15000);
+    return baseServer.request(request);
+  }
+}