You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by an...@apache.org on 2014/03/17 22:29:03 UTC
svn commit: r1578598 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/core/
solr/core/src/java/org/apache/solr/handler/admin/
solr/core/src/test/org/apache/solr/cloud/...
Author: anshum
Date: Mon Mar 17 21:29:03 2014
New Revision: 1578598
URL: http://svn.apache.org/r1578598
Log:
SOLR-5477: Async execution of OverseerCollectionProcessor tasks (merged trunk r1577444, r1577801, r1577965)
Added:
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
- copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
- copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
- copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIAsyncDistributedZkTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
- copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminRequestStatusTest.java
- copied unchanged from r1577444, lucene/dev/trunk/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminRequestStatusTest.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
lucene/dev/branches/branch_4x/solr/solrj/ (props changed)
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Mon Mar 17 21:29:03 2014
@@ -70,12 +70,14 @@ New Features
* SOLR-5720: Add ExpandComponent to expand results collapsed by the
CollapsingQParserPlugin. (Joel Bernstein)
-
* SOLR-3177: Enable tagging and excluding filters in StatsComponent via the
localParams syntax. (Mathias H., Nikolai Luthman, Vitaliy Zhovtyuk, shalin)
* SOLR-1604: Wildcards, ORs etc inside Phrase Queries. (Ahmet Arslan via Erick Erickson)
+* SOLR-5477: Async execution of OverseerCollectionProcessor(CollectionsAPI)
+ tasks. (Anshum Gupta)
+
Bug Fixes
----------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Mar 17 21:29:03 2014
@@ -81,6 +81,13 @@ public class Overseer {
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
+ // Internal map which holds the information about running tasks.
+ private final DistributedMap runningMap;
+ // Internal map which holds the information about successfully completed tasks.
+ private final DistributedMap completedMap;
+ // Internal map which holds the information about failed tasks.
+ private final DistributedMap failureMap;
+
private Map clusterProps;
private boolean isClosed = false;
@@ -88,6 +95,9 @@ public class Overseer {
this.zkClient = reader.getZkClient();
this.stateUpdateQueue = getInQueue(zkClient);
this.workQueue = getInternalQueue(zkClient);
+ this.failureMap = getFailureMap(zkClient);
+ this.runningMap = getRunningMap(zkClient);
+ this.completedMap = getCompletedMap(zkClient);
this.myId = myId;
this.reader = reader;
clusterProps = reader.getClusterProps();
@@ -135,7 +145,7 @@ public class Overseer {
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
-
+
workQueue.poll(); // poll-ing removes the element we got by peek-ing
}
else {
@@ -1135,6 +1145,24 @@ public class Overseer {
createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue-work", null);
}
+
+ /* Internal map for failed tasks, not to be used outside of the Overseer */
+ static DistributedMap getRunningMap(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedMap(zkClient, "/overseer/collection-map-running", null);
+ }
+
+ /* Internal map for successfully completed tasks, not to be used outside of the Overseer */
+ static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedMap(zkClient, "/overseer/collection-map-completed", null);
+ }
+
+ /* Internal map for failed tasks, not to be used outside of the Overseer */
+ static DistributedMap getFailureMap(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedMap(zkClient, "/overseer/collection-map-failure", null);
+ }
/* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Mar 17 21:29:03 2014
@@ -61,10 +61,8 @@ import org.apache.solr.handler.component
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +80,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.Assign.Node;
@@ -118,16 +117,22 @@ public class OverseerCollectionProcessor
public static final String DELETESHARD = "deleteshard";
+ public static final String REQUESTSTATUS = "status";
+
public static final String ROUTER = "router";
public static final String SHARDS_PROP = "shards";
+ public static final String ASYNC = "async";
+
public static final String CREATESHARD = "createshard";
public static final String DELETEREPLICA = "deletereplica";
public static final String MIGRATE = "migrate";
+ public static final String REQUESTID = "requestid";
+
public static final String COLL_CONF = "collection.configName";
public static final String COLL_PROP_PREFIX = "property.";
@@ -149,6 +154,9 @@ public class OverseerCollectionProcessor
.getLogger(OverseerCollectionProcessor.class);
private DistributedQueue workQueue;
+ private DistributedMap runningMap;
+ private DistributedMap completedMap;
+ private DistributedMap failureMap;
private String myId;
@@ -161,15 +169,25 @@ public class OverseerCollectionProcessor
private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
- this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()));
+ this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
+ Overseer.getRunningMap(zkStateReader.getZkClient()),
+ Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
}
- protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, DistributedQueue workQueue) {
+ protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
+ String adminPath,
+ DistributedQueue workQueue,
+ DistributedMap runningMap,
+ DistributedMap completedMap,
+ DistributedMap failureMap) {
this.zkStateReader = zkStateReader;
this.myId = myId;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
this.workQueue = workQueue;
+ this.runningMap = runningMap;
+ this.completedMap = completedMap;
+ this.failureMap = failureMap;
}
@Override
@@ -200,11 +218,35 @@ public class OverseerCollectionProcessor
QueueEvent head = workQueue.peek(true);
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+
+ final String asyncId = (message.containsKey(ASYNC) && message.get(ASYNC) != null) ? (String) message.get(ASYNC) : null;
+
+ try {
+ if(message.containsKey(ASYNC) && message.get(ASYNC) != null && !runningMap.contains(message.getStr(ASYNC)))
+ runningMap.put(asyncId, null);
+ } catch (KeeperException.NodeExistsException e) {
+ // Just catch and do nothing. The runningMap.contains(..) check ensures that this is the only
+ // entry point into the runningMap.
+ // NOTE: Make sure to handle it as soon as OCP gets distributed/multi-threaded.
+ }
+
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
final String operation = message.getStr(QUEUE_OPERATION);
SolrResponse response = processMessage(message, operation);
+
head.setBytes(SolrResponse.serializable(response));
+ if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
+ if(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
+ failureMap.put(asyncId, null);
+ } else {
+ completedMap.put(asyncId, null);
+ }
+ }
+ if(asyncId != null)
+ runningMap.remove(asyncId);
+
workQueue.remove(head);
+
log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -412,9 +454,9 @@ public class OverseerCollectionProcessor
processRoleCommand(message, operation);
} else if (ADDREPLICA.isEqual(operation)) {
addReplica(zkStateReader.getClusterState(), message, results);
- }
-
- else {
+ } else if (REQUESTSTATUS.equals(operation)) {
+ requestStatus(message, results);
+ } else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
}
@@ -764,13 +806,7 @@ public class OverseerCollectionProcessor
}
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+ processResponses(results);
log.info("Finished create command on all shards for collection: "
+ collectionName);
@@ -931,6 +967,9 @@ public class OverseerCollectionProcessor
// the only side effect of this is that the sub shard may end up having more replicas than we want
collectShardResponses(results, false, null);
+ String asyncId = message.getStr(ASYNC);
+ HashMap<String, String> requestMap = new HashMap<String, String>();
+
for (int i=0; i<subRanges.size(); i++) {
String subSlice = subSlices.get(i);
String subShardName = subShardNames.get(i);
@@ -957,12 +996,15 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.NAME, subShardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, subSlice);
+ setupAsyncRequest(asyncId, requestMap, params, nodeName);
addPropertyParams(message, params);
sendShardRequest(nodeName, params);
}
collectShardResponses(results, true,
- "SPLTSHARD failed to create subshard leaders");
+ "SPLITSHARD failed to create subshard leaders");
+
+ completeAsyncRequest(asyncId, requestMap, results);
for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core
@@ -975,12 +1017,18 @@ public class OverseerCollectionProcessor
cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
- sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+
+ ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+ setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+ sendShardRequest(nodeName, p);
}
collectShardResponses(results, true,
- "SPLTSHARD timed out waiting for subshard leaders to come up");
-
+ "SPLITSHARD timed out waiting for subshard leaders to come up");
+
+ completeAsyncRequest(asyncId, requestMap, results);
+
log.info("Successfully created all sub-shards for collection "
+ collectionName + " parent shard: " + slice + " on: " + parentShardLeader);
@@ -996,9 +1044,12 @@ public class OverseerCollectionProcessor
params.add(CoreAdminParams.TARGET_CORE, subShardName);
}
params.set(CoreAdminParams.RANGES, rangesStr);
+ setupAsyncRequest(asyncId, requestMap, params, parentShardLeader.getNodeName());
sendShardRequest(parentShardLeader.getNodeName(), params);
+
collectShardResponses(results, true, "SPLITSHARD failed to invoke SPLIT core admin command");
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Index on shard: " + nodeName + " split into two successfully");
@@ -1012,12 +1063,16 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, subShardName);
+ setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
sendShardRequest(nodeName, params);
}
collectShardResponses(results, true,
"SPLITSHARD failed while asking sub shard leaders to apply buffered updates");
+ completeAsyncRequest(asyncId, requestMap, results);
+
log.info("Successfully applied buffered updates on : " + subShardNames);
// Replica creation for the new Slices
@@ -1067,6 +1122,12 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.NAME, shardName);
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
+ if(asyncId != null) {
+ String requestId = asyncId + Math.abs(System.nanoTime());
+ params.set(ASYNC, requestId);
+ requestMap.put(subShardNodeName, requestId);
+ }
+
addPropertyParams(message, params);
// TODO: Figure the config used by the parent shard and use it.
//params.set("collection.configName", configName);
@@ -1086,12 +1147,19 @@ public class OverseerCollectionProcessor
cmd.setState(ZkStateReader.RECOVERING);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
- sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+ ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+
+ setupAsyncRequest(asyncId, requestMap, p, nodeName);
+
+ sendShardRequest(nodeName, p);
+
}
}
collectShardResponses(results, true,
- "SPLTSHARD failed to create subshard replicas or timed out waiting for them to come up");
+ "SPLITSHARD failed to create subshard replicas or timed out waiting for them to come up");
+
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
@@ -1211,7 +1279,40 @@ public class OverseerCollectionProcessor
} while (srsp != null);
}
-
+ private void requestStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+ log.info("Request status invoked");
+ String requestId = message.getStr(REQUESTID);
+
+ // Special taskId (-1), clears up the request state maps.
+ if(requestId.equals("-1")) {
+ completedMap.clear();
+ failureMap.clear();
+ return;
+ }
+
+ if(completedMap.contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "completed");
+ success.add("msg", "found " + requestId + " in completed tasks");
+ results.add("status", success);
+ } else if (runningMap.contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "running");
+ success.add("msg", "found " + requestId + " in submitted tasks");
+ results.add("status", success);
+ } else if (failureMap.contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "failed");
+ success.add("msg", "found " + requestId + " in failed tasks");
+ results.add("status", success);
+ } else {
+ SimpleOrderedMap failure = new SimpleOrderedMap();
+ failure.add("state", "notfound");
+ failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
+ results.add("status", failure);
+ }
+ }
+
private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
log.info("Delete shard invoked");
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
@@ -1222,7 +1323,7 @@ public class OverseerCollectionProcessor
if (slice == null) {
if(clusterState.hasCollection(collection)) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- "No shard with the specified name exists: " + slice);
+ "No shard with the specified name exists: " + slice.getName());
} else {
throw new SolrException(ErrorCode.BAD_REQUEST,
"No collection with the specified name exists: " + collection);
@@ -1242,13 +1343,7 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.DELETE_INDEX, "true");
sliceCmd(clusterState, params, null, slice);
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+ processResponses(results);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
Overseer.REMOVESHARD, ZkStateReader.COLLECTION_PROP, collection,
@@ -1314,21 +1409,29 @@ public class OverseerCollectionProcessor
"No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
}
+ String asyncId = null;
+ if(message.containsKey(ASYNC) && message.get(ASYNC) != null)
+ asyncId = message.getStr(ASYNC);
+
for (Slice sourceSlice : sourceSlices) {
for (Slice targetSlice : targetSlices) {
log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
- migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results);
+ migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey, timeout, results, asyncId);
}
}
}
- private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice, DocCollection targetCollection, Slice targetSlice, String splitKey, int timeout, NamedList results) throws KeeperException, InterruptedException {
+ private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+ DocCollection targetCollection, Slice targetSlice,
+ String splitKey, int timeout,
+ NamedList results, String asyncId) throws KeeperException, InterruptedException {
String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
if (clusterState.hasCollection(tempSourceCollectionName)) {
log.info("Deleting temporary collection: " + tempSourceCollectionName);
Map<String, Object> props = ZkNodeProps.makeMap(
QUEUE_OPERATION, DELETECOLLECTION,
"name", tempSourceCollectionName);
+
try {
deleteCollection(new ZkNodeProps(props), results);
} catch (Exception e) {
@@ -1350,15 +1453,23 @@ public class OverseerCollectionProcessor
log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+ // For tracking async calls.
+ HashMap<String, String> requestMap = new HashMap<String, String>();
log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+ targetLeader.getStr("core") + " to buffer updates");
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+ String nodeName = targetLeader.getNodeName();
+ setupAsyncRequest(asyncId, requestMap, params, nodeName);
+
sendShardRequest(targetLeader.getNodeName(), params);
+
collectShardResponses(results, true, "MIGRATE failed to request node to buffer updates");
+ completeAsyncRequest(asyncId, requestMap, results);
+
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, Overseer.ADD_ROUTING_RULE,
COLLECTION_PROP, sourceCollection.getName(),
@@ -1405,6 +1516,11 @@ public class OverseerCollectionProcessor
NUM_SLICES, 1,
COLL_CONF, configName,
CREATE_NODE_SET, sourceLeader.getNodeName());
+ if(asyncId != null) {
+ String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+ props.put(ASYNC, internalAsyncId);
+ }
+
log.info("Creating temporary collection: " + props);
createCollection(clusterState, new ZkNodeProps(props), results);
// refresh cluster state
@@ -1437,8 +1553,13 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.RANGES, splitRange.toString());
params.set("split.key", splitKey);
- sendShardRequest(sourceLeader.getNodeName(), params);
+ String tempNodeName = sourceLeader.getNodeName();
+
+ setupAsyncRequest(asyncId, requestMap, params, tempNodeName);
+
+ sendShardRequest(tempNodeName, params);
collectShardResponses(results, true, "MIGRATE failed to invoke SPLIT core admin command");
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
tempSourceCollectionName, targetLeader.getNodeName());
@@ -1448,7 +1569,13 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.NAME, tempCollectionReplica2);
params.set(CoreAdminParams.COLLECTION, tempSourceCollectionName);
params.set(CoreAdminParams.SHARD, tempSourceSlice.getName());
+
+ setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
sendShardRequest(targetLeader.getNodeName(), params);
+ collectShardResponses(results, true,
+ "MIGRATE failed to create replica of temporary collection in target leader node.");
+
+ completeAsyncRequest(asyncId, requestMap, results);
coreNodeName = waitForCoreNodeName(clusterState.getCollection(tempSourceCollectionName),
zkStateReader.getBaseUrlForNodeName(targetLeader.getNodeName()), tempCollectionReplica2);
@@ -1458,14 +1585,19 @@ public class OverseerCollectionProcessor
cmd.setCoreName(tempSourceLeader.getStr("core"));
cmd.setNodeName(targetLeader.getNodeName());
cmd.setCoreNodeName(coreNodeName);
- cmd.setState(ZkStateReader.ACTIVE); // todo introduce asynchronous actions
+ cmd.setState(ZkStateReader.ACTIVE);
cmd.setCheckLive(true);
cmd.setOnlyIfLeader(true);
- sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()));
+ params = new ModifiableSolrParams(cmd.getParams());
+
+ setupAsyncRequest(asyncId, requestMap, params, tempSourceLeader.getNodeName());
+
+ sendShardRequest(tempSourceLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to create temp collection replica or timed out waiting for them to come up");
+ completeAsyncRequest(asyncId, requestMap, results);
log.info("Successfully created replica of temp source collection on target leader node");
log.info("Requesting merge of temp source collection replica to target leader");
@@ -1473,18 +1605,27 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.ACTION, CoreAdminAction.MERGEINDEXES.toString());
params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+ setupAsyncRequest(asyncId, requestMap, params, sourceLeader.getNodeName());
+
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to merge " + tempCollectionReplica2 + " to " + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName());
+ completeAsyncRequest(asyncId, requestMap, results);
+
log.info("Asking target leader to apply buffered updates");
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+ setupAsyncRequest(asyncId, requestMap, params, targetLeader.getNodeName());
+
sendShardRequest(targetLeader.getNodeName(), params);
collectShardResponses(results, true,
"MIGRATE failed to request node to apply buffered updates");
+ completeAsyncRequest(asyncId, requestMap, results);
+
try {
log.info("Deleting temporary collection: " + tempSourceCollectionName);
props = ZkNodeProps.makeMap(
@@ -1497,6 +1638,21 @@ public class OverseerCollectionProcessor
}
}
+ private void completeAsyncRequest(String asyncId, HashMap<String, String> requestMap, NamedList results) {
+ if(asyncId != null) {
+ waitForAsyncCallsToComplete(requestMap, results);
+ requestMap.clear();
+ }
+ }
+
+ private void setupAsyncRequest(String asyncId, HashMap<String, String> requestMap, ModifiableSolrParams params, String nodeName) {
+ if(asyncId != null) {
+ String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
+ params.set(ASYNC, coreAdminAsyncId);
+ requestMap.put(nodeName, coreAdminAsyncId);
+ }
+ }
+
private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
if (a == null || b == null || !a.overlaps(b)) {
return null;
@@ -1542,6 +1698,11 @@ public class OverseerCollectionProcessor
// if it does not, find best nodes to create more cores
int repFactor = message.getInt( REPLICATION_FACTOR, 1);
+
+ String async = null;
+ if (message.containsKey("async"))
+ async = message.getStr("async");
+
Integer numSlices = message.getInt(NUM_SLICES, null);
String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
List<String> shardNames = new ArrayList<>();
@@ -1627,6 +1788,9 @@ public class OverseerCollectionProcessor
if (!created)
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully createcollection: " + message.getStr("name"));
+ // For tracking async calls.
+ HashMap<String, String> requestMap = new HashMap<String, String>();
+
log.info("Creating SolrCores for new collection, shardNames {} , replicationFactor : {}", shardNames, repFactor);
Map<String ,ShardRequest> coresToCreate = new LinkedHashMap<>();
for (int i = 1; i <= shardNames.size(); i++) {
@@ -1662,6 +1826,9 @@ public class OverseerCollectionProcessor
params.set(CoreAdminParams.COLLECTION, collectionName);
params.set(CoreAdminParams.SHARD, sliceName);
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+ setupAsyncRequest(async, requestMap, params, nodeName);
+
addPropertyParams(message, params);
ShardRequest sreq = new ShardRequest();
@@ -1689,13 +1856,9 @@ public class OverseerCollectionProcessor
}
}
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+ processResponses(results);
+
+ completeAsyncRequest(async, requestMap, results);
log.info("Finished create command on all shards for collection: "
+ collectionName);
@@ -1829,6 +1992,18 @@ public class OverseerCollectionProcessor
"ADDREPLICA failed to create replica");
}
+
+ private void processResponses(NamedList results) {
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+ }
+
+
private String createConfNode(String coll, ZkNodeProps message, boolean isLegacyCloud) throws KeeperException, InterruptedException {
String configName = message.getStr(OverseerCollectionProcessor.COLL_CONF);
if(configName == null){
@@ -1873,14 +2048,8 @@ public class OverseerCollectionProcessor
Slice slice = entry.getValue();
sliceCmd(clusterState, params, stateMatcher, slice);
}
-
- ShardResponse srsp;
- do {
- srsp = shardHandler.takeCompletedOrError();
- if (srsp != null) {
- processResponse(results, srsp);
- }
- } while (srsp != null);
+
+ processResponses(results);
}
@@ -1950,4 +2119,70 @@ public class OverseerCollectionProcessor
return isClosed;
}
+ private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+ for(String k:requestMap.keySet()) {
+ log.debug("I am Waiting for : " + k + "/" + requestMap.get(k));
+ results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
+ }
+ }
+
+ private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
+ params.set(CoreAdminParams.REQUESTID, requestId);
+ int counter = 0;
+ ShardRequest sreq;
+ do {
+ sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+ sreq.shards = new String[] {replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ NamedList results = new NamedList();
+ processResponse(results, srsp);
+ String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
+ if(r.equals("running")) {
+ log.debug("The task is still RUNNING, continuing to wait.");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ continue;
+
+ } else if(r.equals("completed")) {
+ log.debug("The task is COMPLETED, returning");
+ return srsp.getSolrResponse().getResponse();
+ } else if (r.equals("failed")) {
+ // TODO: Improve this. Get more information.
+ log.debug("The task is FAILED, returning");
+ return srsp.getSolrResponse().getResponse();
+ } else if (r.equals("notfound")) {
+ log.debug("The task is notfound, retry");
+ if(counter++ < 5) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ break;
+ }
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request: " + srsp.getSolrResponse().getResponse().get("STATUS") +
+ "retried " + counter + "times");
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
+ }
+ }
+ } while (srsp != null);
+ } while(true);
+ }
+
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Mar 17 21:29:03 2014
@@ -92,6 +92,10 @@ public final class ZkController {
private final DistributedQueue overseerJobQueue;
private final DistributedQueue overseerCollectionQueue;
+
+ private final DistributedMap overseerRunningMap;
+ private final DistributedMap overseerCompletedMap;
+ private final DistributedMap overseerFailureMap;
public static final String CONFIGS_ZKNODE = "/configs";
@@ -279,6 +283,9 @@ public final class ZkController {
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
+ this.overseerRunningMap = Overseer.getRunningMap(zkClient);
+ this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
+ this.overseerFailureMap = Overseer.getFailureMap(zkClient);
cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
@@ -1583,6 +1590,18 @@ public final class ZkController {
public DistributedQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
+
+ public DistributedMap getOverseerRunningMap() {
+ return overseerRunningMap;
+ }
+
+ public DistributedMap getOverseerCompletedMap() {
+ return overseerCompletedMap;
+ }
+
+ public DistributedMap getOverseerFailureMap() {
+ return overseerFailureMap;
+ }
public int getClientTimeout() {
return clientTimeout;
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java Mon Mar 17 21:29:03 2014
@@ -368,6 +368,13 @@ public class CoreContainer {
}
try {
+ coreAdminHandler.shutdown();
+ } catch (Exception e) {
+ log.warn("Error shutting down CoreAdminHandler. Continuing to shutdown CoreContainer.");
+ e.printStackTrace();
+ }
+
+ try {
// First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
synchronized (solrCores.getModifyLock()) {
solrCores.getModifyLock().notifyAll(); // wake up anyone waiting
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Mar 17 21:29:03 2014
@@ -18,6 +18,7 @@ package org.apache.solr.handler.admin;
*/
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATESHARD;
import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
@@ -25,6 +26,7 @@ import static org.apache.solr.cloud.Over
import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
@@ -51,6 +53,7 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -199,6 +202,10 @@ public class CollectionsHandler extends
this.handleAddReplica(req, rsp);
break;
}
+ case REQUESTSTATUS: {
+ this.handleRequestStatus(req, rsp);
+ break;
+ }
default: {
throw new RuntimeException("Unknown action: " + action);
}
@@ -236,6 +243,16 @@ public class CollectionsHandler extends
public static long DEFAULT_ZK_TIMEOUT = 180*1000;
+ private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+ log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
+ req.getParams().required().check(REQUESTID);
+ Map<String, Object> props = new HashMap<String, Object>();
+ props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.REQUESTSTATUS);
+ props.put(REQUESTID, req.getParams().get(REQUESTID));
+ ZkNodeProps m = new ZkNodeProps(props);
+ handleResponse(OverseerCollectionProcessor.REQUESTSTATUS, m, rsp);
+ }
+
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
@@ -244,6 +261,35 @@ public class CollectionsHandler extends
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
+
+ if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
+
+ String asyncId = m.getStr(ASYNC);
+
+ if(asyncId.equals("-1")) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
+ }
+
+ NamedList<String> r = new NamedList<>();
+
+ if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
+ coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
+ coreContainer.getZkController().getOverseerRunningMap().contains(asyncId)) {
+ r.add("error", "Task with the same requestid already exists.");
+
+ } else {
+ coreContainer.getZkController().getOverseerCollectionQueue()
+ .offer(ZkStateReader.toJSON(m));
+
+ }
+ r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
+ SolrResponse response = new OverseerSolrResponse(r);
+
+ rsp.getValues().addAll(response.getResponse());
+
+ return;
+ }
+
QueueEvent event = coreContainer.getZkController()
.getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m), timeout);
@@ -368,6 +414,7 @@ public class CollectionsHandler extends
MAX_SHARDS_PER_NODE,
CREATE_NODE_SET ,
SHARDS_PROP,
+ ASYNC,
"router.");
copyPropertiesIfNotNull(req.getParams(), props);
@@ -380,7 +427,7 @@ public class CollectionsHandler extends
log.info("Remove replica: " + req.getParamString());
req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
Map<String, Object> map = makeMap(QUEUE_OPERATION, DELETEREPLICA);
- copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica");
+ copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(DELETEREPLICA, m, rsp);
}
@@ -394,7 +441,7 @@ public class CollectionsHandler extends
throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD);
- copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET);
+ copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, REPLICATION_FACTOR,CREATE_NODE_SET, ASYNC);
copyPropertiesIfNotNull(req.getParams(), map);
ZkNodeProps m = new ZkNodeProps(map);
handleResponse(CREATESHARD, m, rsp);
@@ -485,6 +532,10 @@ public class CollectionsHandler extends
if (rangesStr != null) {
props.put(CoreAdminParams.RANGES, rangesStr);
}
+
+ if (req.getParams().get(ASYNC) != null)
+ props.put(ASYNC, req.getParams().get(ASYNC));
+
copyPropertiesIfNotNull(req.getParams(), props);
ZkNodeProps m = new ZkNodeProps(props);
@@ -497,7 +548,7 @@ public class CollectionsHandler extends
req.getParams().required().check("collection", "split.key", "target.collection");
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.MIGRATE);
- copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout");
+ copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC);
ZkNodeProps m = new ZkNodeProps(props);
handleResponse(OverseerCollectionProcessor.MIGRATE, m, rsp, DEFAULT_ZK_TIMEOUT * 20);
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Mar 17 21:29:03 2014
@@ -17,21 +17,8 @@
package org.apache.solr.handler.admin;
-import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.index.DirectoryReader;
@@ -55,6 +42,7 @@ import org.apache.solr.common.params.Cor
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook;
@@ -75,14 +63,31 @@ import org.apache.solr.update.SplitIndex
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
+import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
/**
*
@@ -91,6 +96,24 @@ import com.google.common.collect.Lists;
public class CoreAdminHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CoreAdminHandler.class);
protected final CoreContainer coreContainer;
+ protected static HashMap<String, Map<String, TaskObject>> requestStatusMap =
+ new HashMap<String,Map<String, TaskObject>>();
+
+ protected ExecutorService parallelExecutor = null;
+
+ protected static int MAX_TRACKED_REQUESTS = 100;
+ public static String RUNNING = "running";
+ public static String COMPLETED = "completed";
+ public static String FAILED = "failed";
+ public static String RESPONSE = "Response";
+ public static String RESPONSE_STATUS = "STATUS";
+ public static String RESPONSE_MESSAGE = "msg";
+
+ static {
+ requestStatusMap.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+ requestStatusMap.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+ requestStatusMap.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<String, TaskObject>()));
+ }
public CoreAdminHandler() {
super();
@@ -136,6 +159,18 @@ public class CoreAdminHandler extends Re
"Core container instance missing");
}
//boolean doPersist = false;
+ String taskId = req.getParams().get("async");
+ TaskObject taskObject = new TaskObject(taskId);
+
+ if(taskId != null) {
+ // Put the tasks into the maps for tracking
+ if (getMap(RUNNING).containsKey(taskId) || getMap(COMPLETED).containsKey(taskId) || getMap(FAILED).containsKey(taskId)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Duplicate request with the same requestid found.");
+ }
+
+ addTask(RUNNING, taskObject);
+ }
// Pick the action
SolrParams params = req.getParams();
@@ -147,6 +182,19 @@ public class CoreAdminHandler extends Re
this.handleCustomAction(req, rsp);
}
}
+
+ if (taskId == null) {
+ handleRequestInternal(req, rsp, action);
+ } else {
+ ParallelCoreAdminHandlerThread parallelHandlerThread = new ParallelCoreAdminHandlerThread(req, rsp, action, taskObject);
+ if(parallelExecutor == null || parallelExecutor.isShutdown())
+ parallelExecutor = Executors.newFixedThreadPool(50,
+ new DefaultSolrThreadFactory("parallelCoreAdminExecutor"));
+ parallelExecutor.execute(parallelHandlerThread);
+ }
+ }
+
+ protected void handleRequestInternal(SolrQueryRequest req, SolrQueryResponse rsp, CoreAdminAction action) throws Exception {
if (action != null) {
switch (action) {
case CREATE: {
@@ -199,17 +247,17 @@ public class CoreAdminHandler extends Re
this.handleWaitForStateAction(req, rsp);
break;
}
-
+
case REQUESTRECOVERY: {
this.handleRequestRecoveryAction(req, rsp);
break;
}
-
+
case REQUESTSYNCSHARD: {
this.handleRequestSyncAction(req, rsp);
break;
}
-
+
// todo : Can this be done by the regular RecoveryStrategy route?
case REQUESTAPPLYUPDATES: {
this.handleRequestApplyUpdatesAction(req, rsp);
@@ -219,6 +267,10 @@ public class CoreAdminHandler extends Re
this.handleRequestBufferUpdatesAction(req, rsp);
break;
}
+ case REQUESTSTATUS: {
+ this.handleRequestActionStatus(req, rsp);
+ break;
+ }
case OVERSEEROP:{
ZkController zkController = coreContainer.getZkController();
if(zkController != null){
@@ -240,7 +292,7 @@ public class CoreAdminHandler extends Re
rsp.setHttpCaching(false);
}
-
+
/**
* Handle the core admin SPLIT action.
*/
@@ -755,6 +807,28 @@ public class CoreAdminHandler extends Re
}
/**
+ * Handle "REQUESTSTATUS" action
+ */
+ protected void handleRequestActionStatus(SolrQueryRequest req, SolrQueryResponse rsp) {
+ SolrParams params = req.getParams();
+ String requestId = params.get(CoreAdminParams.REQUESTID);
+ log.info("Checking request status for : " + requestId);
+
+ if (mapContainsTask(RUNNING, requestId)) {
+ rsp.add(RESPONSE_STATUS, RUNNING);
+ } else if(mapContainsTask(COMPLETED, requestId)) {
+ rsp.add(RESPONSE_STATUS, COMPLETED);
+ rsp.add(RESPONSE, getMap(COMPLETED).get(requestId).getRspObject());
+ } else if(mapContainsTask(FAILED, requestId)) {
+ rsp.add(RESPONSE_STATUS, FAILED);
+ rsp.add(RESPONSE, getMap(FAILED).get(requestId).getRspObject());
+ } else {
+ rsp.add(RESPONSE_STATUS, "notfound");
+ rsp.add(RESPONSE_MESSAGE, "No task found in running, completed or failed tasks");
+ }
+ }
+
+ /**
* Handle "SWAP" action
*/
protected void handleSwapAction(SolrQueryRequest req, SolrQueryResponse rsp) {
@@ -1172,4 +1246,123 @@ public class CoreAdminHandler extends Re
public String getSource() {
return "$URL$";
}
+
+ /**
+ * Class to implement multi-threaded CoreAdminHandler behaviour.
+ * This accepts all of the context from handleRequestBody.
+ */
+ protected class ParallelCoreAdminHandlerThread implements Runnable {
+ SolrQueryRequest req;
+ SolrQueryResponse rsp;
+ CoreAdminAction action;
+ TaskObject taskObject;
+
+ public ParallelCoreAdminHandlerThread (SolrQueryRequest req, SolrQueryResponse rsp,
+ CoreAdminAction action, TaskObject taskObject){
+ this.req = req;
+ this.rsp = rsp;
+ this.action = action;
+ this.taskObject = taskObject;
+ }
+
+ public void run() {
+ boolean exceptionCaught = false;
+ try {
+ handleRequestInternal(req, rsp, action);
+ taskObject.setRspObject(rsp);
+ } catch (Exception e) {
+ exceptionCaught = true;
+ taskObject.setRspObjectFromException(e);
+ } finally {
+ removeTask("running", taskObject.taskId);
+ if(exceptionCaught) {
+ addTask("failed", taskObject, true);
+ } else
+ addTask("completed", taskObject, true);
+ }
+
+ }
+
+ }
+
+ /**
+ * Helper class to manage the tasks to be tracked.
+ * This contains the taskId, request and the response (if available).
+ */
+ private class TaskObject {
+ String taskId;
+ String rspInfo;
+
+ public TaskObject(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getRspObject() {
+ return rspInfo;
+ }
+
+ public void setRspObject(SolrQueryResponse rspObject) {
+ this.rspInfo = rspObject.getToLogAsString("TaskId: " + this.taskId + " ");
+ }
+
+ public void setRspObjectFromException(Exception e) {
+ this.rspInfo = e.getMessage();
+ }
+ }
+
+ /**
+ * Helper method to add a task to a tracking map.
+ */
+ protected void addTask(String map, TaskObject o, boolean limit) {
+ if(limit && getMap(map).size() == MAX_TRACKED_REQUESTS) {
+ String key = getMap(map).entrySet().iterator().next().getKey();
+ getMap(map).remove(key);
+ }
+ addTask(map, o);
+ }
+
+
+ protected void addTask(String map, TaskObject o) {
+ synchronized (getMap(map)) {
+ getMap(map).put(o.taskId, o);
+ }
+ }
+
+ /**
+ * Helper method to remove a task from a tracking map.
+ */
+ protected void removeTask(String map, String taskId) {
+ synchronized (getMap(map)) {
+ getMap(map).remove(taskId);
+ }
+ }
+
+ /**
+ * Helper method to check if a map contains a taskObject with the given taskId.
+ */
+ protected boolean mapContainsTask(String map, String taskId) {
+ return getMap(map).containsKey(taskId);
+ }
+
+ /**
+ * Helper method to get a TaskObject given a map and a taskId.
+ */
+ protected TaskObject getTask(String map, String taskId) {
+ return getMap(map).get(taskId);
+ }
+
+ /**
+ * Helper method to get a request status map given the name.
+ */
+ private Map<String, TaskObject> getMap(String map) {
+ return requestStatusMap.get(map);
+ }
+
+ /**
+ * Method to ensure shutting down of the ThreadPool Executor.
+ */
+ public void shutdown() {
+ if (parallelExecutor != null && !parallelExecutor.isShutdown())
+ ExecutorUtil.shutdownAndAwaitTermination(parallelExecutor);
+ }
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java Mon Mar 17 21:29:03 2014
@@ -114,7 +114,7 @@ public class MigrateRouteKeyTest extends
return ruleRemoved;
}
- private void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
+ protected void invokeMigrateApi(String sourceCollection, String splitKey, String targetCollection) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.MIGRATE.toString());
params.set("collection", sourceCollection);
@@ -125,7 +125,7 @@ public class MigrateRouteKeyTest extends
invoke(params);
}
- private void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
+ protected void invoke(ModifiableSolrParams params) throws SolrServerException, IOException {
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
@@ -161,7 +161,7 @@ public class MigrateRouteKeyTest extends
waitForRecoveriesToFinish(targetCollection, false);
}
- private void multipleShardMigrateTest() throws Exception {
+ protected void multipleShardMigrateTest() throws Exception {
del("*:*");
commit();
assertTrue(cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound() == 0);
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Mon Mar 17 21:29:03 2014
@@ -71,6 +71,9 @@ public class OverseerCollectionProcessor
private static final String CONFIG_NAME = "myconfig";
private static DistributedQueue workQueueMock;
+ private static DistributedMap runningMapMock;
+ private static DistributedMap completedMapMock;
+ private static DistributedMap failureMapMock;
private static ShardHandler shardHandlerMock;
private static ZkStateReader zkStateReaderMock;
private static ClusterState clusterStateMock;
@@ -90,8 +93,10 @@ public class OverseerCollectionProcessor
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandler shardHandler, String adminPath,
- DistributedQueue workQueue) {
- super(zkStateReader, myId, shardHandler, adminPath, workQueue);
+ DistributedQueue workQueue, DistributedMap runningMap,
+ DistributedMap completedMap,
+ DistributedMap failureMap) {
+ super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap);
}
@Override
@@ -111,6 +116,9 @@ public class OverseerCollectionProcessor
@BeforeClass
public static void setUpOnce() throws Exception {
workQueueMock = createMock(DistributedQueue.class);
+ runningMapMock = createMock(DistributedMap.class);
+ completedMapMock = createMock(DistributedMap.class);
+ failureMapMock = createMock(DistributedMap.class);
shardHandlerMock = createMock(ShardHandler.class);
zkStateReaderMock = createMock(ZkStateReader.class);
clusterStateMock = createMock(ClusterState.class);
@@ -120,6 +128,9 @@ public class OverseerCollectionProcessor
@AfterClass
public static void tearDownOnce() {
workQueueMock = null;
+ runningMapMock = null;
+ completedMapMock = null;
+ failureMapMock = null;
shardHandlerMock = null;
zkStateReaderMock = null;
clusterStateMock = null;
@@ -131,13 +142,16 @@ public class OverseerCollectionProcessor
super.setUp();
queue.clear();
reset(workQueueMock);
- reset(workQueueMock);
+ reset(runningMapMock);
+ reset(completedMapMock);
+ reset(failureMapMock);
reset(shardHandlerMock);
reset(zkStateReaderMock);
reset(clusterStateMock);
reset(solrZkClientMock);
underTest = new OverseerCollectionProcessorToBeTested(zkStateReaderMock,
- "1234", shardHandlerMock, ADMIN_PATH, workQueueMock);
+ "1234", shardHandlerMock, ADMIN_PATH, workQueueMock, runningMapMock,
+ completedMapMock, failureMapMock);
zkMap.clear();
collectionsSet.clear();
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Mon Mar 17 21:29:03 2014
@@ -17,21 +17,21 @@
package org.apache.solr.client.solrj.request;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ContentStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
/**
* This class is experimental and subject to change.
*
@@ -41,6 +41,7 @@ public class CollectionAdminRequest exte
{
protected String collection = null;
protected CollectionAction action = null;
+ protected String asyncId = null;
protected static class CollectionShardAdminRequest extends CollectionAdminRequest {
protected String shardName = null;
@@ -53,6 +54,7 @@ public class CollectionAdminRequest exte
params.remove( "name" );
params.set( "collection", collection );
params.set( "shard", shardName);
+ params.set( "async", asyncId);
return params;
}
@@ -124,6 +126,9 @@ public class CollectionAdminRequest exte
// OverseerCollectionProcessor.REPLICATION_FACTOR
params.set( "replicationFactor", replicationFactor);
}
+ if (asyncId != null) {
+ params.set("async", asyncId);
+ }
return params;
}
@@ -188,6 +193,25 @@ public class CollectionAdminRequest exte
}
}
+ //a request status collection request
+ public static class RequestStatus extends CollectionAdminRequest {
+ protected String requestId = null;
+
+ public RequestStatus() {
+ action = CollectionAction.REQUESTSTATUS;
+ }
+
+ public void setRequestId(String requestId) {this.requestId = requestId; }
+ public String getRequestId() { return this.requestId; }
+
+ @Override
+ public SolrParams getParams() {
+ ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+ params.set("requestid", requestId);
+ return params;
+ }
+ }
+
//a collection alias create request
public static class CreateAlias extends CollectionAdminRequest {
protected String aliasedCollections = null;
@@ -239,6 +263,10 @@ public class CollectionAdminRequest exte
this.action = action;
}
+ public void setAsyncId(String asyncId) {
+ this.asyncId = asyncId;
+ }
+
//---------------------------------------------------------------------------------------
//
//---------------------------------------------------------------------------------------
@@ -285,7 +313,19 @@ public class CollectionAdminRequest exte
String nodeSet,
String conf,
String routerField,
- SolrServer server ) throws SolrServerException, IOException
+ SolrServer server) throws SolrServerException, IOException
+ {
+ return createCollection(name, shards, repl, maxShards, nodeSet, conf, routerField, server, null);
+ }
+
+ // creates collection using a compositeId router
+ public static CollectionAdminResponse createCollection( String name,
+ Integer shards, Integer repl, Integer maxShards,
+ String nodeSet,
+ String conf,
+ String routerField,
+ SolrServer server,
+ String asyncId) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
@@ -296,27 +336,47 @@ public class CollectionAdminRequest exte
req.setCreateNodeSet(nodeSet);
req.setConfigName(conf);
req.setRouterField(routerField);
+ req.setAsyncId(asyncId);
return req.process( server );
}
+
+ public static CollectionAdminResponse createCollection(String name, Integer shards,
+ String conf,
+ SolrServer server) throws SolrServerException, IOException {
+ return createCollection(name, shards, conf, server, null);
+ }
+
public static CollectionAdminResponse createCollection( String name,
Integer shards, String conf,
- SolrServer server ) throws SolrServerException, IOException
+ SolrServer server,
+ String asyncId) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
req.setRouterName("compositeId");
req.setNumShards(shards);
req.setConfigName(conf);
+ req.setAsyncId(asyncId);
return req.process( server );
}
+ public static CollectionAdminResponse createCollection(String name,
+ String shards,
+ Integer repl, Integer maxShards,
+ String conf,
+ String routerField,
+ SolrServer server) throws SolrServerException, IOException {
+ return createCollection(name, shards, repl, maxShards, conf, routerField, null);
+ }
+
// creates a collection using an implicit router
public static CollectionAdminResponse createCollection( String name,
String shards, Integer repl, Integer maxShards,
String nodeSet,
String conf,
String routerField,
- SolrServer server ) throws SolrServerException, IOException
+ SolrServer server,
+ String asyncId) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
@@ -327,34 +387,68 @@ public class CollectionAdminRequest exte
req.setCreateNodeSet(nodeSet);
req.setConfigName(conf);
req.setRouterField(routerField);
+ req.setAsyncId(asyncId);
return req.process( server );
}
+
+ public static CollectionAdminResponse createCollection( String name,
+ String shards, String conf,
+ SolrServer server) throws SolrServerException, IOException
+ {
+ return createCollection(name, shards, conf, server, null);
+ }
+
public static CollectionAdminResponse createCollection( String name,
String shards, String conf,
- SolrServer server ) throws SolrServerException, IOException
+ SolrServer server, String asyncId ) throws SolrServerException, IOException
{
Create req = new Create();
req.setCollectionName(name);
req.setRouterName("implicit");
req.setShards(shards);
req.setConfigName(conf);
+ req.setAsyncId(asyncId);
return req.process( server );
}
- public static CollectionAdminResponse reloadCollection( String name, SolrServer server ) throws SolrServerException, IOException
+ public static CollectionAdminResponse reloadCollection( String name, SolrServer server)
+ throws SolrServerException, IOException {
+ return reloadCollection(name, server, null);
+ }
+
+ public static CollectionAdminResponse reloadCollection( String name, SolrServer server, String asyncId )
+ throws SolrServerException, IOException
{
CollectionAdminRequest req = new Reload();
req.setCollectionName(name);
+ req.setAsyncId(asyncId);
return req.process( server );
}
- public static CollectionAdminResponse deleteCollection( String name, SolrServer server ) throws SolrServerException, IOException
+ public static CollectionAdminResponse deleteCollection( String name, SolrServer server)
+ throws SolrServerException, IOException
+ {
+ return deleteCollection(name, server, null);
+ }
+
+ public static CollectionAdminResponse deleteCollection( String name, SolrServer server,
+ String asyncId)
+ throws SolrServerException, IOException
{
CollectionAdminRequest req = new Delete();
req.setCollectionName(name);
+ req.setAsyncId(asyncId);
return req.process( server );
}
+ public static CollectionAdminResponse requestStatus(String requestId, SolrServer server)
+ throws SolrServerException, IOException {
+ RequestStatus req = new RequestStatus();
+
+ req.setRequestId(requestId);
+ return req.process(server);
+ }
+
public static CollectionAdminResponse createShard( String name, String shard, String nodeSet, SolrServer server ) throws SolrServerException, IOException
{
CreateShard req = new CreateShard();
@@ -370,15 +464,29 @@ public class CollectionAdminRequest exte
public static CollectionAdminResponse splitShard( String name, String shard, String ranges, SolrServer server ) throws SolrServerException, IOException
{
+ return splitShard(name, shard, ranges, server, null);
+ }
+
+ public static CollectionAdminResponse splitShard( String name, String shard, String ranges, SolrServer server,
+ String asyncId) throws SolrServerException, IOException
+ {
SplitShard req = new SplitShard();
req.setCollectionName(name);
req.setShardName(shard);
req.setRanges(ranges);
+ req.setAsyncId(asyncId);
return req.process( server );
}
- public static CollectionAdminResponse splitShard( String name, String shard, SolrServer server ) throws SolrServerException, IOException
+
+ public static CollectionAdminResponse splitShard(String name, String shard, SolrServer server)
+ throws SolrServerException, IOException {
+ return splitShard(name, shard, null, server, null);
+ }
+
+ public static CollectionAdminResponse splitShard( String name, String shard, SolrServer server,
+ String asyncId ) throws SolrServerException, IOException
{
- return splitShard(name, shard, null, server);
+ return splitShard(name, shard, null, server, asyncId);
}
public static CollectionAdminResponse deleteShard( String name, String shard, SolrServer server ) throws SolrServerException, IOException
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Mon Mar 17 21:29:03 2014
@@ -42,6 +42,7 @@ public interface CollectionParams
ADDROLE,
REMOVEROLE,
CLUSTERPROP,
+ REQUESTSTATUS,
ADDREPLICA;
public static CollectionAction get( String p )
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1578598&r1=1578597&r2=1578598&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Mon Mar 17 21:29:03 2014
@@ -91,7 +91,9 @@ public abstract class CoreAdminParams
public final static String RANGES = "ranges";
public static final String ROLES = "roles";
-
+
+ public static final String REQUESTID = "requestid";
+
public static final String CORE_NODE_NAME = "coreNodeName";
/** Prefix for core property name=value pair **/
@@ -128,7 +130,8 @@ public abstract class CoreAdminParams
REQUESTAPPLYUPDATES,
LOAD_ON_STARTUP,
TRANSIENT,
- OVERSEEROP;
+ OVERSEEROP,
+ REQUESTSTATUS;
public static CoreAdminAction get( String p )
{