You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by an...@apache.org on 2014/03/14 09:01:20 UTC
svn commit: r1577444 [1/2] - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/
core/src/test/org/apache/solr/handler/...
Author: anshum
Date: Fri Mar 14 08:01:18 2014
New Revision: 1577444
URL: http://svn.apache.org/r1577444
Log:
SOLR-5477: Async execution of OverseerCollectionProcessor tasks
Added:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminRequestStatusTest.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Mar 14 08:01:18 2014
@@ -119,6 +119,8 @@ New Features
* SOLR-5653: Create a RestManager to provide REST API endpoints for
reconfigurable plugins. (Tim Potter, Steve Rowe)
+* SOLR-5477: Async execution of OverseerCollectionProcessor(CollectionsAPI)
+ tasks. (Anshum Gupta)
Bug Fixes
----------------------
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,232 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A distributed map.
+ * This supports basic map functions e.g. get, put, contains for interaction with zk which
+ * don't have to be ordered i.e. DistributedQueue.
+ */
+public class DistributedMap {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DistributedMap.class);
+
+ private static long DEFAULT_TIMEOUT = 5*60*1000;
+
+ private final String dir;
+
+ private SolrZkClient zookeeper;
+ private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ private final String prefix = "mn-";
+
+ private final String response_prefix = "mnr-" ;
+
+ public DistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl) {
+ this.dir = dir;
+
+ ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
+ try {
+ cmdExecutor.ensureExists(dir, zookeeper);
+ } catch (KeeperException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+
+ if (acl != null) {
+ this.acl = acl;
+ }
+ this.zookeeper = zookeeper;
+ }
+
+ private class LatchChildWatcher implements Watcher {
+
+ Object lock = new Object();
+ private WatchedEvent event = null;
+
+ public LatchChildWatcher() {}
+
+ public LatchChildWatcher(Object lock) {
+ this.lock = lock;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
+ + event.getState() + " type " + event.getType());
+ synchronized (lock) {
+ this.event = event;
+ lock.notifyAll();
+ }
+ }
+
+ public void await(long timeout) throws InterruptedException {
+ synchronized (lock) {
+ lock.wait(timeout);
+ }
+ }
+
+ public WatchedEvent getWatchedEvent() {
+ return event;
+ }
+ }
+
+ /**
+ * Inserts data into zookeeper.
+ *
+ * @return true if data was successfully added
+ */
+ private String createData(String path, byte[] data, CreateMode mode)
+ throws KeeperException, InterruptedException {
+ for (;;) {
+ try {
+ return zookeeper.create(path, data, acl, mode, true);
+ } catch (KeeperException.NoNodeException e) {
+ try {
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException ne) {
+ // someone created it
+ }
+ }
+ }
+ }
+
+
+ public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
+ return createData(dir + "/" + prefix + trackingId, data,
+ CreateMode.PERSISTENT) != null;
+ }
+
+ /**
+ * Offer the data and wait for the response
+ *
+ */
+ public MapEvent put(String trackingId, byte[] data, long timeout) throws KeeperException,
+ InterruptedException {
+ String path = createData(dir + "/" + prefix + trackingId, data,
+ CreateMode.PERSISTENT);
+ String watchID = createData(
+ dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
+ null, CreateMode.EPHEMERAL);
+ Object lock = new Object();
+ LatchChildWatcher watcher = new LatchChildWatcher(lock);
+ synchronized (lock) {
+ if (zookeeper.exists(watchID, watcher, true) != null) {
+ watcher.await(timeout);
+ }
+ }
+ byte[] bytes = zookeeper.getData(watchID, null, null, true);
+ zookeeper.delete(watchID, -1, true);
+ return new MapEvent(watchID, bytes, watcher.getWatchedEvent());
+ }
+
+ public MapEvent get(String trackingId) throws KeeperException, InterruptedException {
+ return new MapEvent(trackingId, zookeeper.getData(dir + "/" + prefix + trackingId, null, null, true), null);
+ }
+
+ public boolean contains(String trackingId) throws KeeperException, InterruptedException {
+ return zookeeper.exists(dir + "/" + prefix + trackingId, true);
+ }
+
+ public void remove(String trackingId) throws KeeperException, InterruptedException {
+ zookeeper.delete(dir + "/" + prefix + trackingId, -1, true);
+ }
+
+ /**
+ * Helper method to clear all child nodes for a parent node.
+ */
+ public void clear() throws KeeperException, InterruptedException {
+ List<String> childNames = zookeeper.getChildren(dir, null, true);
+ for(String childName: childNames) {
+ zookeeper.delete(dir + "/" + childName, -1, true);
+ }
+
+ }
+
+ public static class MapEvent {
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((id == null) ? 0 : id.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null) return false;
+ if (getClass() != obj.getClass()) return false;
+ MapEvent other = (MapEvent) obj;
+ if (id == null) {
+ if (other.id != null) return false;
+ } else if (!id.equals(other.id)) return false;
+ return true;
+ }
+
+ private WatchedEvent event = null;
+ private String id;
+ private byte[] bytes;
+
+ MapEvent(String id, byte[] bytes, WatchedEvent event) {
+ this.id = id;
+ this.bytes = bytes;
+ this.event = event;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setBytes(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ public byte[] getBytes() {
+ return bytes;
+ }
+
+ public WatchedEvent getWatchedEvent() {
+ return event;
+ }
+
+ }
+
+
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Mar 14 08:01:18 2014
@@ -81,6 +81,13 @@ public class Overseer {
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
+ // Internal map which holds the information about running tasks.
+ private final DistributedMap runningMap;
+ // Internal map which holds the information about successfully completed tasks.
+ private final DistributedMap completedMap;
+ // Internal map which holds the information about failed tasks.
+ private final DistributedMap failureMap;
+
private Map clusterProps;
private boolean isClosed = false;
@@ -88,6 +95,9 @@ public class Overseer {
this.zkClient = reader.getZkClient();
this.stateUpdateQueue = getInQueue(zkClient);
this.workQueue = getInternalQueue(zkClient);
+ this.failureMap = getFailureMap(zkClient);
+ this.runningMap = getRunningMap(zkClient);
+ this.completedMap = getCompletedMap(zkClient);
this.myId = myId;
this.reader = reader;
clusterProps = reader.getClusterProps();
@@ -135,7 +145,7 @@ public class Overseer {
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
-
+
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
@@ -1135,6 +1145,24 @@ public class Overseer {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue-work", null);
}
+
+ /* Internal map for failed tasks, not to be used outside of the Overseer */
+ static DistributedMap getRunningMap(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedMap(zkClient, "/overseer/collection-map-running", null);
+ }
+
+ /* Internal map for successfully completed tasks, not to be used outside of the Overseer */
+ static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedMap(zkClient, "/overseer/collection-map-completed", null);
+ }
+
+ /* Internal map for failed tasks, not to be used outside of the Overseer */
+ static DistributedMap getFailureMap(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedMap(zkClient, "/overseer/collection-map-failure", null);
+ }
/* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Mar 14 08:01:18 2014
@@ -61,10 +61,8 @@ import org.apache.solr.handler.component
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +80,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.Assign.Node;
@@ -118,16 +117,22 @@ public class OverseerCollectionProcessor
public static final String DELETESHARD = "deleteshard";
+ public static final String REQUESTSTATUS = "status";
+
public static final String ROUTER = "router";
public static final String SHARDS_PROP = "shards";
+ public static final String ASYNC = "async";
+
public static final String CREATESHARD = "createshard";
public static final String DELETEREPLICA = "deletereplica";
public static final String MIGRATE = "migrate";
+ public static final String REQUESTID = "requestid";
+
public static final String COLL_CONF = "collection.configName";
public static final String COLL_PROP_PREFIX = "property.";
@@ -149,6 +154,9 @@ public class OverseerCollectionProcessor
.getLogger(OverseerCollectionProcessor.class);
private DistributedQueue workQueue;
+ private DistributedMap runningMap;
+ private DistributedMap completedMap;
+ private DistributedMap failureMap;
private String myId;
@@ -161,15 +169,25 @@ public class OverseerCollectionProcessor
private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
- this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()));
+ this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
+ Overseer.getRunningMap(zkStateReader.getZkClient()),
+ Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
}
- protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, DistributedQueue workQueue) {
+ protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
+ String adminPath,
+ DistributedQueue workQueue,
+ DistributedMap runningMap,
+ DistributedMap completedMap,
+ DistributedMap failureMap) {
this.zkStateReader = zkStateReader;
this.myId = myId;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
this.workQueue = workQueue;
+ this.runningMap = runningMap;
+ this.completedMap = completedMap;
+ this.failureMap = failureMap;
}
@Override
@@ -200,11 +218,35 @@ public class OverseerCollectionProcessor
QueueEvent head = workQueue.peek(true);
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+
+ final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
+
+ try {
+ if(message.containsKey(ASYNC) && message.get(ASYNC) != null && !runningMap.contains(message.getStr(ASYNC)))
+ runningMap.put(asyncId, null);
+ } catch (KeeperException.NodeExistsException e) {
+ // Just catch and do nothing. The runningMap.contains(..) check ensures that this is the only
+ // entry point into the runningMap.
+ // NOTE: Make sure to handle it as soon as OCP gets distributed/multi-threaded.
+ }
+
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
final String operation = message.getStr(QUEUE_OPERATION);
SolrResponse response = processMessage(message, operation);
+
head.setBytes(SolrResponse.serializable(response));
+ if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
+ if(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
+ failureMap.put(asyncId, null);
+ } else {
+ completedMap.put(asyncId, null);
+ }
+ }
+ if(asyncId != null)
+ runningMap.remove(asyncId);
+
workQueue.remove(head);
+
log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -412,9 +454,9 @@ public class OverseerCollectionProcessor
processRoleCommand(message, operation);
} else if (ADDREPLICA.isEqual(operation)) {
addReplica(zkStateReader.getClusterState(), message, results);
- }
-
- else {
+ } else if (REQUESTSTATUS.equals(operation)) {
+ requestStatus(message, results);
+ } else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
}
@@ -764,13 +806,7 @@ public class OverseerCollectionProcessor
}
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+ processResponses(results);
log.info("Finished create command on all shards for collection: "
+ collectionName);
@@ -931,6 +967,9 @@ public class OverseerCollectionProcessor
// the only side effect of this is that the sub shard may end up having more replicas than we want
collectShardResponses(results, false, null);
+ String asyncId = message.getStr(ASYNC);
+ HashMap<String, String> requestMap = new HashMap<String, String>();
+
for (int i=0; i<subRanges.size(); i++) {
String subSlice = subSlices.get(i);
String subShardName = subShardNames.get(i);
@@ -957,12 +996,15 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.NAME, subShardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, subSlice);
+ setupAsyncRequest(asyncId, requestMap, params, nodeName);
addPropertyParams(message, params);
sendShardRequest(nodeName, params);
}
collectShardResponses(results, true,
- "SPLTSHARD failed to create subshard leaders");
+ "SPLITSHARD failed to create subshard leaders");
+
+ completeAsyncRequest(asyncId, requestMap, results);
for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core
@@ -975,12 +1017,18 @@ public class OverseerCollectionProcessor
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
- sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+
+ ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+ setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+ sendShardRequest(nodeName, p);
}
collectShardResponses(results, true,
- "SPLTSHARD timed out waiting for subshard leaders to come up");
-
+ "SPLITSHARD timed out waiting for subshard leaders to come up");
+
+ completeAsyncRequest(asyncId, requestMap, results);
+
log.info("Successfully created all sub-shards for collection "
+ collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
@@ -996,9 +1044,12 @@ public class OverseerCollectionProcessor
params.add(CoreAdminParams.TARGET_CORE, subShardName);
}
params.set(CoreAdminParams.RANGES, rangesStr);
+ setupAsyncRequest(asyncId, requestMap, params, parentShardLeader.getNodeName());
sendShardRequest(parentShardLeader.getNodeName(), params);
+
collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Index on shard: " + nodeName + " split into two successfully");
@@ -1012,12 +1063,16 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, subShardName);
+ setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
sendShardRequest(nodeName, params);
}
collectShardResponses(results, true,
"SPLITSHARD failed while asking sub shard leaders to apply buffered updates");
+ completeAsyncRequest(asyncId, requestMap, results);
+
log.info("Successfully applied buffered updates on : " + subShardNames);
// Replica creation for the new Slices
@@ -1067,6 +1122,12 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.NAME, shardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
+ if(asyncId != null) {
+ String requestId = asyncId + Math.abs(System.nanoTime());
+ params.set(ASYNC, requestId);
+ requestMap.put(subShardNodeName, requestId);
+ }
+
addPropertyParams(message, params);
// TODO: Figure the config used by the parent shard and use it.
//params.set("collection.configName", configName);
@@ -1086,12 +1147,19 @@ public class OverseerCollectionProcessor
cmd.setState(ZkStateReader.RECOVERING);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
- sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+ ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+
+ setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+ sendShardRequest(nodeName, p);
+
}
}
collectShardResponses(results, true,
- "SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
+ "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up");
+
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
@@ -1211,7 +1279,40 @@ public class OverseerCollectionProcessor
} while (srsp != null);
}
-
+ private void requestStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+ log.info("Request status invoked");
+ String requestId = message.getStr(REQUESTID);
+
+ // Special taskId (-1), clears up the request state maps.
+ if(requestId.equals("-1")) {
+ completedMap.clear();
+ failureMap.clear();
+ return;
+ }
+
+ if(completedMap.contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "completed");
+ success.add("msg", "found " + requestId + " in completed tasks");
+ results.add("status", success);
+ } else if (runningMap.contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "running");
+ success.add("msg", "found " + requestId + " in submitted tasks");
+ results.add("status", success);
+ } else if (failureMap.contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "failed");
+ success.add("msg", "found " + requestId + " in failed tasks");
+ results.add("status", success);
+ } else {
+ SimpleOrderedMap failure = new SimpleOrderedMap();
+ failure.add("state", "notfound");
+ failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
+ results.add("status", failure);
+ }
+ }
+
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
log.info("Delete shard invoked");
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
@@ -1222,7 +1323,7 @@ public class OverseerCollectionProcessor
if (slice == null) {
if(clusterState.hasCollection(collection)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- "No shard with the specified name exists: " + slice);
+ "No shard with the specified name exists: " + slice.getName());
} else {
throw new SolrException(ErrorCode.BAD_REQUEST,
"No collection with the specified name exists: " + collection);
@@ -1242,13 +1343,7 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.DELETE_INDEX, "true");
sliceCmd(clusterState, params, null, slice);
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+ processResponses(results);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection,
@@ -1314,21 +1409,29 @@ public class OverseerCollectionProcessor
"No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
}
+ String asyncId = null;
+ if(message.containsKey(ASYNC) && message.get(ASYNC) != null)
+ asyncId = message.getStr(ASYNC);
+
for (Slice sourceSlice : sourceSlices) {
for (Slice targetSlice : targetSlices) {
log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
- migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results);
+ migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results, asyncId);
}
}
}
- private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, DocCollection targetCollection, Slice targetSlice, String splitKey, int timeout, NamedList results) throws KeeperException, InterruptedException {
+ private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+ DocCollection targetCollection, Slice targetSlice,
+ String splitKey, int timeout,
+ NamedList results, String asyncId) throws KeeperException, InterruptedException {
String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
if (clusterState.hasCollection(tempSourceCollectionName)) {
log.info("Deleting temporary collection: " + tempSourceCollectionName);
Map<String, Object> props = ZkNodeProps.makeMap(
QUEUE_OPERATION, DELETECOLLECTION,
"name", tempSourceCollectionName);
+
try {
deleteCollection(new ZkNodeProps(props), results);
} catch (Exception e) {
@@ -1350,15 +1453,23 @@ public class OverseerCollectionProcessor
log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+ // For tracking async calls.
+ HashMap<String, String> requestMap = new HashMap<String, String>();
log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+ targetLeader.getStr("core") + " to buffer updates");
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+ String nodeName = targetLeader.getNodeName();
+ setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
sendShardRequest(targetLeader.getNodeName(), params);
+
collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates");
+ completeAsyncRequest(asyncId, requestMap, results);
+
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
COLLECTION_PROP, sourceCollection.getName(),
@@ -1405,6 +1516,11 @@ public class OverseerCollectionProcessor
NUM_SLICES, 1,
COLL_CONF, configName,
CREATE_NODE_SET, sourceLeader.getNodeName());
+ if(asyncId != null) {
+ String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+ props.put(ASYNC, internalAsyncId);
+ }
+
log.info("Creating temporary collection: " + props);
createCollection(clusterState, new ZkNodeProps(props), results);
// refresh cluster state
@@ -1437,8 +1553,13 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.RANGES, splitRange.toString());
params.set("split.key", splitKey);
- sendShardRequest(sourceLeader.getNodeName(), params);
+ String tempNodeName = sourceLeader.getNodeName();
+
+ setupAsyncRequest(asyncId, requestMap, params, tempNodeName);
+
+ sendShardRequest(tempNodeName, params);
collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command");
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
tempSourceCollectionName, targetLeader.getNodeName());
@@ -1448,7 +1569,13 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.NAME, tempCollectionReplica2);
params.set(CoreAdminParams.COLLECTION, tempSourceCollectionName);
params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
+
+ setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
sendShardRequest(targetLeader.getNodeName(), params);
+ collectShardResponses(results, true,
+ "MIGRATE failed to create replica of temporary collection in target leader node.");
+
+ completeAsyncRequest(asyncId, requestMap, results);
coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
zkStateReader.getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
@@ -1458,14 +1585,19 @@ public class OverseerCollectionProcessor
cmd.setCoreName(tempSourceLeader.getStr("core"));
cmd.setNodeName(targetLeader.getNodeName());
cmd.setCoreNodeName(coreNodeName);
- cmd.setState(ZkStateReader.ACTIVE); // todo introduce asynchronous actions
+ cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
- sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
+ params = new ModifiableSolrParams(cmd.getParams());
+
+ setupAsyncRequest(asyncId, requestMap, params, tempSourceLeader.getNodeName());
+
+ sendShardRequest(tempSourceLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to create temp collection replica or timed out waiting for them to come up");
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully created replica of temp source collection on target leader node");
log.info("Requesting merge of temp source collection replica to target leader");
@@ -1473,18 +1605,27 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.ACTION, CoreAdminAction.MERGEINDEXES.toString());
params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+ setupAsyncRequest(asyncId, requestMap, params, sourceLeader.getNodeName());
+
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName());
+ completeAsyncRequest(asyncId, requestMap, results);
+
log.info("Asking target leader to apply buffered updates");
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+ setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
+
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to request node to apply buffered updates");
+ completeAsyncRequest(asyncId, requestMap, results);
+
try {
log.info("Deleting temporary collection: " + tempSourceCollectionName);
props = ZkNodeProps.makeMap(
@@ -1497,6 +1638,21 @@ public class OverseerCollectionProcessor
}
}
+ private void completeAsyncRequest(String asyncId, HashMap<String, String> requestMap, NamedList results) {
+ if(asyncId != null) {
+ waitForAsyncCallsToComplete(requestMap, results);
+ requestMap.clear();
+ }
+ }
+
+ private void setupAsyncRequest(String asyncId, HashMap<String, String> requestMap, ModifiableSolrParams params, String nodeName) {
+ if(asyncId != null) {
+ String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
+ params.set(ASYNC, coreAdminAsyncId);
+ requestMap.put(nodeName, coreAdminAsyncId);
+ }
+ }
+
private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
if (a == null || b == null || !a.overlaps(b)) {
return null;
@@ -1542,6 +1698,11 @@ public class OverseerCollectionProcessor
// if it does not, find best nodes to create more cores
int repFactor = message.getInt( REPLICATION_FACTOR, 1);
+
+ String async = null;
+ if (message.containsKey("async"))
+ async = message.getStr("async");
+
Integer numSlices = message.getInt(NUM_SLICES, null);
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<>();
@@ -1627,6 +1788,9 @@ public class OverseerCollectionProcessor
if (!created)
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
+ // For tracking async calls.
+ HashMap<String, String> requestMap = new HashMap<String, String>();
+
log.info("Creating SolrCores for new collection, shardNames {} , replicationFactor : {}", shardNames, repFactor);
Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
for (int i = 1; i <= shardNames.size(); i++) {
@@ -1662,6 +1826,11 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+ String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+ if (replica.startsWith("http://")) replica = replica.substring(7);
+
+ setupAsyncRequest(async, requestMap, params, nodeName);
+
addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
@@ -1689,13 +1858,9 @@ public class OverseerCollectionProcessor
}
}
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+ processResponses(results);
+
+ completeAsyncRequest(async, requestMap, results);
log.info("Finished create command on all shards for collection: "
+ collectionName);
@@ -1829,6 +1994,18 @@ public class OverseerCollectionProcessor
"ADDREPLICA failed to create replica");
}
+
+ private void processResponses(NamedList results) {
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+ }
+
+
private String createConfNode(String coll, ZkNodeProps message, boolean isLegacyCloud) throws KeeperException, InterruptedException {
String configName = message.getStr(OverseerCollectionProcessor.COLL_CONF);
if(configName == null){
@@ -1873,14 +2050,8 @@ public class OverseerCollectionProcessor
Slice slice = entry.getValue();
sliceCmd(clusterState, params, stateMatcher, slice);
}
-
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+
+ processResponses(results);
}
@@ -1950,4 +2121,71 @@ public class OverseerCollectionProcessor
return isClosed;
}
+ private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+ for(String k:requestMap.keySet()) {
+ log.debug("I am Waiting for : " + k + "/" + requestMap.get(k));
+ results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
+ }
+ }
+
+ private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
+ params.set(CoreAdminParams.REQUESTID, requestId);
+ int counter = 0;
+ ShardRequest sreq;
+ do {
+ sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+ if (replica.startsWith("http://")) replica = replica.substring(7);
+ sreq.shards = new String[] {replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ NamedList results = new NamedList();
+ processResponse(results, srsp);
+ String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
+ if(r.equals("running")) {
+ log.debug("The task is still RUNNING, continuing to wait.");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ continue;
+
+ } else if(r.equals("completed")) {
+ log.debug("The task is COMPLETED, returning");
+ return srsp.getSolrResponse().getResponse();
+ } else if (r.equals("failed")) {
+ // TODO: Improve this. Get more information.
+ log.debug("The task is FAILED, returning");
+ return srsp.getSolrResponse().getResponse();
+ } else if (r.equals("notfound")) {
+ log.debug("The task is notfound, retry");
+ if(counter++ < 5) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ break;
+ }
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request: " + srsp.getSolrResponse().getResponse().get("STATUS") +
+ "retried " + counter + "times");
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
+ }
+ }
+ } while (srsp != null);
+ } while(true);
+ }
+
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Mar 14 08:01:18 2014
@@ -92,6 +92,10 @@ public final class ZkController {
private final DistributedQueue overseerJobQueue;
private final DistributedQueue overseerCollectionQueue;
+
+ private final DistributedMap overseerRunningMap;
+ private final DistributedMap overseerCompletedMap;
+ private final DistributedMap overseerFailureMap;
public static final String CONFIGS_ZKNODE = "/configs";
@@ -279,6 +283,9 @@ public final class ZkController {
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
+ this.overseerRunningMap = Overseer.getRunningMap(zkClient);
+ this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
+ this.overseerFailureMap = Overseer.getFailureMap(zkClient);
cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
@@ -1583,6 +1590,18 @@ public final class ZkController {
public DistributedQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
+
+ public DistributedMap getOverseerRunningMap() {
+ return overseerRunningMap;
+ }
+
+ public DistributedMap getOverseerCompletedMap() {
+ return overseerCompletedMap;
+ }
+
+ public DistributedMap getOverseerFailureMap() {
+ return overseerFailureMap;
+ }
public int getClientTimeout() {
return clientTimeout;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Mar 14 08:01:18 2014
@@ -368,6 +368,13 @@ public class CoreContainer {
}
try {
+ coreAdminHandler.shutdown();
+ } catch (Exception e) {
+ log.warn("Error shutting down CoreAdminHandler. Continuing to shutdown CoreContainer.");
+ e.printStackTrace();
+ }
+
+ try {
// First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
synchronized (solrCores.getModifyLock()) {
solrCores.getModifyLock().notifyAll(); // wake up anyone waiting
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Mar 14 08:01:18 2014
@@ -18,6 +18,7 @@ package org.apache.solr.handler.admin;
*/
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
@@ -25,6 +26,7 @@ import static org.apache.solr.cloud.Over
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
@@ -51,6 +53,7 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -199,6 +202,10 @@ public class CollectionsHandler extends
this.handleAddReplica(req, rsp);
break;
}
+ case REQUESTSTATUS: {
+ this.handleRequestStatus(req, rsp);
+ break;
+ }
default: {
throw new RuntimeException("Unknown action: " + action);
}
@@ -236,6 +243,16 @@ public class CollectionsHandler extends
public static long DEFAULT_ZK_TIMEOUT = 180*1000;
+ private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+ log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
+ req.getParams().required().check(REQUESTID);
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.REQUESTSTATUS);
+ props.put(REQUESTID, req.getParams().get(REQUESTID));
+ ZkNodeProps m = new ZkNodeProps(props);
+ handleResponse(OverseerCollectionProcessor.REQUESTSTATUS, m, rsp);
+ }
+
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@@ -244,6 +261,35 @@ public class CollectionsHandler extends
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
+
+ if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
+
+ String asyncId = m.getStr(ASYNC);
+
+ if(asyncId.equals("-1")) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
+ }
+
+ NamedList<String> r = new NamedList<>();
+
+ if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
+ coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
+ coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
+ r.add("error", "Task with the same requestid already exists.");
+
+ } else {
+ coreContainer.getZkController().getOverseerCollectionQueue()
+ .offer(ZkStateReader.toJSON(m));
+
+ }
+ r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
+ SolrResponse response = new OverseerSolrResponse(r);
+
+ rsp.getValues().addAll(response.getResponse());
+
+ return;
+ }
+
QueueEvent event = coreContainer.getZkController()
.getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m), timeout);
@@ -368,6 +414,7 @@ public class CollectionsHandler extends
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET ,
SHARDS_PROP,
+ ASYNC,
"router.");
copyPropertiesIfNotNull(req.getParams(), props);
@@ -380,7 +427,7 @@ public class CollectionsHandler extends
log.info("Remove replica: " + req.getParamString());
req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
Map<String, Object> map = makeMap(QUEUE_OPERATION, DELETEREPLICA);
- copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica");
+ copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(DELETEREPLICA, m, rsp);
}
@@ -394,7 +441,7 @@ public class CollectionsHandler extends
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD);
- copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET);
+ copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET, ASYNC);
copyPropertiesIfNotNull(req.getParams(), map);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(CREATESHARD, m, rsp);
@@ -485,6 +532,10 @@ public class CollectionsHandler extends
if (rangesStr != null) {
props.put(CoreAdminParams.RANGES, rangesStr);
}
+
+ if (req.getParams().get(ASYNC) != null)
+ props.put(ASYNC, req.getParams().get(ASYNC));
+
copyPropertiesIfNotNull(req.getParams(), props);
ZkNodeProps m = new ZkNodeProps(props);
@@ -497,7 +548,7 @@ public class CollectionsHandler extends
req.getParams().required().check("collection", "split.key", "target.collection");
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.MIGRATE);
- copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout");
+ copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC);
ZkNodeProps m = new ZkNodeProps(props);
handleResponse(OverseerCollectionProcessor.MIGRATE, m, rsp, DEFAULT_ZK_TIMEOUT * 20);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Fri Mar 14 08:01:18 2014
@@ -17,21 +17,8 @@
package org.apache.solr.handler.admin;
-import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.index.DirectoryReader;
@@ -55,6 +42,7 @@ import org.apache.solr.common.params.Cor
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook;
@@ -75,14 +63,31 @@ import org.apache.solr.update.SplitIndex
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
/**
*
@@ -91,6 +96,24 @@ import com.google.common.collect.Lists;
public class CoreAdminHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
protected final CoreContainer coreContainer;
+ protected static HashMap<String, Map<String, TaskObject>> requestStatusMap =
+ new HashMap<String,Map<String, TaskObject>>();
+
+ protected ExecutorService parallelExecutor = null;
+
+ protected static int MAX_TRACKED_REQUESTS = 100;
+ public static String RUNNING = "running";
+ public static String COMPLETED = "completed";
+ public static String FAILED = "failed";
+ public static String RESPONSE = "Response";
+ public static String RESPONSE_STATUS = "STATUS";
+ public static String RESPONSE_MESSAGE = "msg";
+
+ static {
+ requestStatusMap.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+ requestStatusMap.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+ requestStatusMap.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+ }
public CoreAdminHandler() {
super();
@@ -136,6 +159,18 @@ public class CoreAdminHandler extends Re
"Core container instance missing");
}
//boolean doPersist = false;
+ String taskId = req.getParams().get("async");
+ TaskObject taskObject = new TaskObject(taskId);
+
+ if(taskId != null) {
+ // Put the tasks into the maps for tracking
+ if (getMap(RUNNING).containsKey(taskId) || getMap(COMPLETED).containsKey(taskId) || getMap(FAILED).containsKey(taskId)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Duplicate request with the same requestid found.");
+ }
+
+ addTask(RUNNING, taskObject);
+ }
// Pick the action
SolrParams params = req.getParams();
@@ -147,6 +182,19 @@ public class CoreAdminHandler extends Re
this.handleCustomAction(req, rsp);
}
}
+
+ if (taskId == null) {
+ handleRequestInternal(req, rsp, action);
+ } else {
+ ParallelCoreAdminHandlerThread parallelHandlerThread = new ParallelCoreAdminHandlerThread(req, rsp, action, taskObject);
+ if(parallelExecutor == null || parallelExecutor.isShutdown())
+ parallelExecutor = Executors.newFixedThreadPool(50,
+ new DefaultSolrThreadFactory("parallelCoreAdminExecutor"));
+ parallelExecutor.execute(parallelHandlerThread);
+ }
+ }
+
+ protected void handleRequestInternal(SolrQueryRequest req, SolrQueryResponse rsp, CoreAdminAction action) throws Exception {
if (action != null) {
switch (action) {
case CREATE: {
@@ -199,17 +247,17 @@ public class CoreAdminHandler extends Re
this.handleWaitForStateAction(req, rsp);
break;
}
-
+
case REQUESTRECOVERY: {
this.handleRequestRecoveryAction(req, rsp);
break;
}
-
+
case REQUESTSYNCSHARD: {
this.handleRequestSyncAction(req, rsp);
break;
}
-
+
// todo : Can this be done by the regular RecoveryStrategy route?
case REQUESTAPPLYUPDATES: {
this.handleRequestApplyUpdatesAction(req, rsp);
@@ -219,6 +267,10 @@ public class CoreAdminHandler extends Re
this.handleRequestBufferUpdatesAction(req, rsp);
break;
}
+ case REQUESTSTATUS: {
+ this.handleRequestActionStatus(req, rsp);
+ break;
+ }
case OVERSEEROP:{
ZkController zkController = coreContainer.getZkController();
if(zkController != null){
@@ -240,7 +292,7 @@ public class CoreAdminHandler extends Re
rsp.setHttpCaching(false);
}
-
+
/**
* Handle the core admin SPLIT action.
*/
@@ -755,6 +807,28 @@ public class CoreAdminHandler extends Re
}
/**
+ * Handle "REQUESTSTATUS" action
+ */
+ protected void handleRequestActionStatus(SolrQueryRequest req, SolrQueryResponse rsp) {
+ SolrParams params = req.getParams();
+ String requestId = params.get(CoreAdminParams.REQUESTID);
+ log.info("Checking request status for : " + requestId);
+
+ if (mapContainsTask(RUNNING, requestId)) {
+ rsp.add(RESPONSE_STATUS, RUNNING);
+ } else if(mapContainsTask(COMPLETED, requestId)) {
+ rsp.add(RESPONSE_STATUS, COMPLETED);
+ rsp.add(RESPONSE, getMap(COMPLETED).get(requestId).getRspObject());
+ } else if(mapContainsTask(FAILED, requestId)) {
+ rsp.add(RESPONSE_STATUS, FAILED);
+ rsp.add(RESPONSE, getMap(FAILED).get(requestId).getRspObject());
+ } else {
+ rsp.add(RESPONSE_STATUS, "notfound");
+ rsp.add(RESPONSE_MESSAGE, "No task found in running, completed or failed tasks");
+ }
+ }
+
+ /**
* Handle "SWAP" action
*/
protected void handleSwapAction(SolrQueryRequest req, SolrQueryResponse rsp) {
@@ -1172,4 +1246,123 @@ public class CoreAdminHandler extends Re
public String getSource() {
return "$URL$";
}
+
+ /**
+ * Class to implement multi-threaded CoreAdminHandler behaviour.
+ * This accepts all of the context from handleRequestBody.
+ */
+ protected class ParallelCoreAdminHandlerThread implements Runnable {
+ SolrQueryRequest req;
+ SolrQueryResponse rsp;
+ CoreAdminAction action;
+ TaskObject taskObject;
+
+ public ParallelCoreAdminHandlerThread (SolrQueryRequest req, SolrQueryResponse rsp,
+ CoreAdminAction action, TaskObject taskObject){
+ this.req = req;
+ this.rsp = rsp;
+ this.action = action;
+ this.taskObject = taskObject;
+ }
+
+ public void run() {
+ boolean exceptionCaught = false;
+ try {
+ handleRequestInternal(req, rsp, action);
+ taskObject.setRspObject(rsp);
+ } catch (Exception e) {
+ exceptionCaught = true;
+ taskObject.setRspObjectFromException(e);
+ } finally {
+ removeTask("running", taskObject.taskId);
+ if(exceptionCaught) {
+ addTask("failed", taskObject, true);
+ } else
+ addTask("completed", taskObject, true);
+ }
+
+ }
+
+ }
+
+ /**
+ * Helper class to manage the tasks to be tracked.
+ * This contains the taskId, request and the response (if available).
+ */
+ private class TaskObject {
+ String taskId;
+ String rspInfo;
+
+ public TaskObject(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getRspObject() {
+ return rspInfo;
+ }
+
+ public void setRspObject(SolrQueryResponse rspObject) {
+ this.rspInfo = rspObject.getToLogAsString("TaskId: " + this.taskId + " ");
+ }
+
+ public void setRspObjectFromException(Exception e) {
+ this.rspInfo = e.getMessage();
+ }
+ }
+
+ /**
+ * Helper method to add a task to a tracking map.
+ */
+ protected void addTask(String map, TaskObject o, boolean limit) {
+ if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
+ String key = getMap(map).entrySet().iterator().next().getKey();
+ getMap(map).remove(key);
+ }
+ addTask(map, o);
+ }
+
+
+ protected void addTask(String map, TaskObject o) {
+ synchronized (getMap(map)) {
+ getMap(map).put(o.taskId, o);
+ }
+ }
+
+ /**
+ * Helper method to remove a task from a tracking map.
+ */
+ protected void removeTask(String map, String taskId) {
+ synchronized (getMap(map)) {
+ getMap(map).remove(taskId);
+ }
+ }
+
+ /**
+ * Helper method to check if a map contains a taskObject with the given taskId.
+ */
+ protected boolean mapContainsTask(String map, String taskId) {
+ return getMap(map).containsKey(taskId);
+ }
+
+ /**
+ * Helper method to get a TaskObject given a map and a taskId.
+ */
+ protected TaskObject getTask(String map, String taskId) {
+ return getMap(map).get(taskId);
+ }
+
+ /**
+ * Helper method to get a request status map given the name.
+ */
+ private Map<String, TaskObject> getMap(String map) {
+ return requestStatusMap.get(map);
+ }
+
+ /**
+ * Method to ensure shutting down of the ThreadPool Executor.
+ */
+ public void shutdown() {
+ if (parallelExecutor != null && !parallelExecutor.isShutdown())
+ ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
+ }
}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,125 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Before;
+
+import java.io.IOException;
+
+public class AsyncMigrateRouteKeyTest extends MigrateRouteKeyTest {
+
+ public AsyncMigrateRouteKeyTest() {
+ schemaString = "schema15.xml"; // we need a string id
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ public void doTest() throws Exception {
+ waitForThingsToLevelOut(15);
+
+ multipleShardMigrateTest();
+ printLayout();
+ }
+
+ protected void checkAsyncRequestForCompletion(String asyncId) throws SolrServerException, IOException {
+ ModifiableSolrParams params;
+ String message;
+ params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+ params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
+ message = sendStatusRequestWithRetry(params, 10);
+ assertEquals("Task " + asyncId + " not found in completed tasks.",
+ "found " + asyncId + " in completed tasks", message);
+ }
+
+ @Override
+ protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ String asyncId = "20140128";
+ params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.MIGRATE.toString());
+ params.set("collection", sourceCollection);
+ params.set("target.collection", targetCollection);
+ params.set("split.key", splitKey);
+ params.set("forward.timeout", 45);
+ params.set("async", asyncId);
+
+ invoke(params);
+
+ checkAsyncRequestForCompletion(asyncId);
+ }
+
+ /**
+ * Helper method to send a status request with specific retry limit and return
+ * the message/null from the success response.
+ */
+ private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
+ throws SolrServerException, IOException {
+ NamedList status = null;
+ String state = null;
+ String message = null;
+ NamedList r;
+ while (maxCounter-- > 0) {
+ r = sendRequest(params);
+ status = (NamedList) r.get("status");
+ state = (String) status.get("state");
+ message = (String) status.get("msg");
+
+ if (state.equals("completed") || state.equals("failed"))
+ return (String) status.get("msg");
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+
+ }
+ // Return last state?
+ return message;
+ }
+
+ protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
+ SolrRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+
+ String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
+ .getBaseURL();
+ baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
+
+ HttpSolrServer baseServer = null;
+
+ try {
+ baseServer = new HttpSolrServer(baseUrl);
+ baseServer.setConnectionTimeout(15000);
+ return baseServer.request(request);
+ } finally {
+ baseServer.shutdown();
+ }
+ }
+}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,185 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer.RemoteSolrException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.CoreAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.SolrInfoMBean.Category;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.update.DirectUpdateHandler2;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+
+/**
+ * Tests the Cloud Collections API.
+ */
+@Slow
+public class CollectionsAPIAsyncDistributedZkTest extends AbstractFullDistribZkTestBase {
+
+ private static final boolean DEBUG = false;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ useJettyDataDir = false;
+
+ System.setProperty("numShards", Integer.toString(sliceCount));
+ System.setProperty("solr.xml.persist", "true");
+ }
+
+ public CollectionsAPIAsyncDistributedZkTest() {
+ fixShardCount = true;
+
+ sliceCount = 2;
+ shardCount = 4;
+ }
+
+ @Override
+ public void doTest() throws Exception {
+
+ testSolrJAPICalls();
+
+ if (DEBUG) {
+ super.printLayout();
+ }
+ }
+
+ private void testSolrJAPICalls() throws Exception {
+ SolrServer server = createNewSolrServer("", getBaseUrl((HttpSolrServer) clients.get(0)));
+ CollectionAdminRequest.createCollection("testasynccollectioncreation", 2, "conf1", server, "1001");
+ String state = null;
+
+ state = getRequestStateAfterCompletion("1001", 10, server);
+
+ assertEquals("CreateCollection task did not complete!", "completed", state);
+
+ CollectionAdminRequest.createCollection("testasynccollectioncreation", 2, "conf1", server, "1002");
+
+ state = getRequestStateAfterCompletion("1002", 3, server);
+
+ assertEquals("Recreating a collection with the same name didn't fail, should have.", "failed", state);
+
+ CollectionAdminRequest.splitShard("testasynccollectioncreation", "shard1", server, "1003");
+
+ state = getRequestStateAfterCompletion("1003", 60, server);
+
+ assertEquals("Shard split did not complete. Last recorded state: " + state, "completed", state);
+ }
+
+ private String getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrServer server)
+ throws IOException, SolrServerException {
+ String state = null;
+ while(waitForSeconds-- > 0) {
+ state = getRequestState(requestId, server);
+ if(state.equals("completed") || state.equals("failed"))
+ return state;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ return state;
+ }
+
+ private String getRequestState(String requestId, SolrServer server) throws IOException, SolrServerException {
+ CollectionAdminResponse response = CollectionAdminRequest.requestStatus(requestId, server);
+ NamedList innerResponse = (NamedList) response.getResponse().get("status");
+ return (String) innerResponse.get("state");
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ System.clearProperty("numShards");
+ System.clearProperty("zkHost");
+ System.clearProperty("solr.xml.persist");
+
+ // insurance
+ DirectUpdateHandler2.commitOnClose = true;
+ }
+
+}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java Fri Mar 14 08:01:18 2014
@@ -114,7 +114,7 @@ public class MigrateRouteKeyTest extends
return ruleRemoved;
}
- private void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
+ protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
params.set("collection", sourceCollection);
@@ -125,7 +125,7 @@ public class MigrateRouteKeyTest extends
invoke(params);
}
- private void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
+ protected void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
@@ -161,7 +161,7 @@ public class MigrateRouteKeyTest extends
waitForRecoveriesToFinish(targetCollection, false);
}
- private void multipleShardMigrateTest() throws Exception {
+ protected void multipleShardMigrateTest() throws Exception {
del("*:*");
commit();
assertTrue(cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() == 0);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1577444&r1=1577443&r2=1577444&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Fri Mar 14 08:01:18 2014
@@ -71,6 +71,9 @@ public class OverseerCollectionProcessor
private static final String CONFIG_NAME = "myconfig";
private static DistributedQueue workQueueMock;
+ private static DistributedMap runningMapMock;
+ private static DistributedMap completedMapMock;
+ private static DistributedMap failureMapMock;
private static ShardHandler shardHandlerMock;
private static ZkStateReader zkStateReaderMock;
private static ClusterState clusterStateMock;
@@ -90,8 +93,10 @@ public class OverseerCollectionProcessor
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandler shardHandler, String adminPath,
- DistributedQueue workQueue) {
- super(zkStateReader, myId, shardHandler, adminPath, workQueue);
+ DistributedQueue workQueue, DistributedMap runningMap,
+ DistributedMap completedMap,
+ DistributedMap failureMap) {
+ super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap);
}
@Override
@@ -111,6 +116,9 @@ public class OverseerCollectionProcessor
@BeforeClass
public static void setUpOnce() throws Exception {
workQueueMock = createMock(DistributedQueue.class);
+ runningMapMock = createMock(DistributedMap.class);
+ completedMapMock = createMock(DistributedMap.class);
+ failureMapMock = createMock(DistributedMap.class);
shardHandlerMock = createMock(ShardHandler.class);
zkStateReaderMock = createMock(ZkStateReader.class);
clusterStateMock = createMock(ClusterState.class);
@@ -120,6 +128,9 @@ public class OverseerCollectionProcessor
@AfterClass
public static void tearDownOnce() {
workQueueMock = null;
+ runningMapMock = null;
+ completedMapMock = null;
+ failureMapMock = null;
shardHandlerMock = null;
zkStateReaderMock = null;
clusterStateMock = null;
@@ -131,13 +142,16 @@ public class OverseerCollectionProcessor
super.setUp();
queue.clear();
reset(workQueueMock);
- reset(workQueueMock);
+ reset(runningMapMock);
+ reset(completedMapMock);
+ reset(failureMapMock);
reset(shardHandlerMock);
reset(zkStateReaderMock);
reset(clusterStateMock);
reset(solrZkClientMock);
underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
- "1234", shardHandlerMock, ADMIN_PATH, workQueueMock);
+ "1234", shardHandlerMock, ADMIN_PATH, workQueueMock, runningMapMock,
+ completedMapMock, failureMapMock);
zkMap.clear();
collectionsSet.clear();
}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java?rev=1577444&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java Fri Mar 14 08:01:18 2014
@@ -0,0 +1,224 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Before;
+
+import java.io.IOException;
+
+public class TestRequestStatusCollectionAPI extends BasicDistributedZkTest {
+
+ public TestRequestStatusCollectionAPI() {
+ schemaString = "schema15.xml"; // we need a string id
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ }
+
+ public void doTest() {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
+ params.set("name", "collection2");
+ params.set("numShards", 2);
+ params.set("replicationFactor", 1);
+ params.set("maxShardsPerNode", 100);
+ params.set("collection.configName", "conf1");
+ params.set("async", "1000");
+ try {
+ sendRequest(params);
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // Check for the request to be completed.
+ int maxCounter = 10;
+
+ NamedList r = null;
+ NamedList status = null;
+ String message = null;
+
+ params = new ModifiableSolrParams();
+
+ params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+ params.set(OverseerCollectionProcessor.REQUESTID, "1000");
+
+ try {
+ message = sendStatusRequestWithRetry(params, 10);
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ assertEquals("Task 1000 not found in completed tasks.", "found 1000 in completed tasks", message);
+
+ // Check for a random (hopefully non-existent request id
+ params = new ModifiableSolrParams();
+ params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+ params.set(OverseerCollectionProcessor.REQUESTID, "9999999");
+ try {
+ r = sendRequest(params);
+ status = (NamedList) r.get("status");
+ message = (String) status.get("msg");
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ assertEquals("Task 9999999 found in tasks queue.", "Did not find taskid [9999999] in any tasks queue", message);
+
+ params = new ModifiableSolrParams();
+ params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.SPLITSHARD.toString());
+ params.set("collection", "collection2");
+ params.set("shard", "shard1");
+ params.set("async", "1001");
+ try {
+ sendRequest(params);
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // Check for the request to be completed.
+ params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+ params.set(OverseerCollectionProcessor.REQUESTID, "1001");
+ try {
+ message = sendStatusRequestWithRetry(params, maxCounter);
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ assertEquals("Task 1001 not found in completed tasks.", "found 1001 in completed tasks", message);
+
+ params = new ModifiableSolrParams();
+ params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
+ params.set("name", "collection2");
+ params.set("numShards", 2);
+ params.set("replicationFactor", 1);
+ params.set("maxShardsPerNode", 100);
+ params.set("collection.configName", "conf1");
+ params.set("async", "1002");
+ try {
+ sendRequest(params);
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+
+ params = new ModifiableSolrParams();
+
+ params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
+ params.set(OverseerCollectionProcessor.REQUESTID, "1002");
+
+ try {
+ message = sendStatusRequestWithRetry(params, 10);
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+
+ assertEquals("Task 1002 not found in completed tasks.", "found 1002 in failed tasks", message);
+
+ params = new ModifiableSolrParams();
+ params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.CREATE.toString());
+ params.set("name", "collection3");
+ params.set("numShards", 1);
+ params.set("replicationFactor", 1);
+ params.set("maxShardsPerNode", 100);
+ params.set("collection.configName", "conf1");
+ params.set("async", "1002");
+ try {
+ r = sendRequest(params);
+ } catch (SolrServerException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ assertEquals("Did not error out on duplicate requests (same request id)",
+ "Task with the same requestid already exists.", r.get("error"));
+
+ }
+
+
+ /**
+ * Helper method to send a status request with specific retry limit and return
+ * the message/null from the success response.
+ */
+ private String sendStatusRequestWithRetry(ModifiableSolrParams params, int maxCounter)
+ throws SolrServerException, IOException{
+ NamedList status = null;
+ String state = null;
+ String message = null;
+ NamedList r;
+ while(maxCounter-- > 0) {
+ r = sendRequest(params);
+ status = (NamedList) r.get("status");
+ state = (String) status.get("state");
+ message = (String) status.get("msg");
+
+ if(state.equals("completed") || state.equals("failed"))
+ return (String) status.get("msg");
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+
+ }
+ // Return last state?
+ return message;
+ }
+
+ protected NamedList sendRequest(ModifiableSolrParams params) throws SolrServerException, IOException {
+ SolrRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+
+ String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
+ .getBaseURL();
+ baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
+
+ HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
+ baseServer.setConnectionTimeout(15000);
+ return baseServer.request(request);
+ }
+}