You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/05/14 18:25:53 UTC
svn commit: r1679397 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/admin/
solrj/src/java/org/apache/solr/common/params/
Author: noble
Date: Thu May 14 16:25:52 2015
New Revision: 1679397
URL: http://svn.apache.org/r1679397
Log:
SOLR-7544: CollectionsHandler refactored to be more modular
Added:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1679397&r1=1679396&r2=1679397&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu May 14 16:25:52 2015
@@ -360,6 +360,8 @@ Other Changes
the actual ErrorCode is used when available.
(Hrishikesh Gadre via Shawn Heisey)
+* SOLR-7544: CollectionsHandler refactored to be more modular (Noble Paul)
+
================== 5.1.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1679397&r1=1679396&r2=1679397&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu May 14 16:25:52 2015
@@ -17,45 +17,30 @@ package org.apache.solr.handler.admin;
* limitations under the License.
*/
-import static org.apache.solr.cloud.Overseer.*;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
-import static org.apache.solr.common.cloud.DocCollection.*;
-import static org.apache.solr.common.cloud.ZkStateReader.*;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonParams.*;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
-import org.apache.solr.cloud.LeaderElector;
-import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -63,7 +48,6 @@ import org.apache.solr.common.cloud.ZkSt
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@@ -77,7 +61,36 @@ import org.apache.zookeeper.KeeperExcept
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableSet;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
+import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
+import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
+import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CONFIGS_ZKNODE;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
+import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
+import static org.apache.solr.common.params.CoreAdminParams.INSTANCE_DIR;
+import static org.apache.solr.common.params.ShardParams._ROUTE_;
public class CollectionsHandler extends RequestHandlerBase {
protected static Logger log = LoggerFactory.getLogger(CollectionsHandler.class);
@@ -85,8 +98,8 @@ public class CollectionsHandler extends
public CollectionsHandler() {
super();
- // Unlike most request handlers, CoreContainer initialization
- // should happen in the constructor...
+ // Unlike most request handlers, CoreContainer initialization
+ // should happen in the constructor...
this.coreContainer = null;
}
@@ -133,528 +146,51 @@ public class CollectionsHandler extends
// Pick the action
SolrParams params = req.getParams();
- CollectionAction action = null;
String a = params.get(CoreAdminParams.ACTION);
if (a != null) {
- action = CollectionAction.get(a);
- }
- if (action == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a);
- }
-
- switch (action) {
- case CREATE: {
- this.handleCreateAction(req, rsp);
- break;
- }
- case DELETE: {
- this.handleDeleteAction(req, rsp);
- break;
- }
- case RELOAD: {
- this.handleReloadAction(req, rsp);
- break;
- }
- case SYNCSHARD: {
- this.handleSyncShardAction(req, rsp);
- break;
- }
- case CREATEALIAS: {
- this.handleCreateAliasAction(req, rsp);
- break;
- }
- case DELETEALIAS: {
- this.handleDeleteAliasAction(req, rsp);
- break;
- }
- case SPLITSHARD: {
- this.handleSplitShardAction(req, rsp);
- break;
- }
- case DELETESHARD: {
- this.handleDeleteShardAction(req, rsp);
- break;
- }
- case CREATESHARD: {
- this.handleCreateShard(req, rsp);
- break;
- }
- case DELETEREPLICA: {
- this.handleRemoveReplica(req, rsp);
- break;
- }
- case MIGRATE: {
- this.handleMigrate(req, rsp);
- break;
- }
- case ADDROLE: {
- handleRole(ADDROLE, req, rsp);
- break;
- }
- case REMOVEROLE: {
- handleRole(REMOVEROLE, req, rsp);
- break;
- }
- case CLUSTERPROP: {
- this.handleProp(req, rsp);
- break;
- }
- case ADDREPLICA: {
- this.handleAddReplica(req, rsp);
- break;
- }
- case REQUESTSTATUS: {
- this.handleRequestStatus(req, rsp);
- break;
- }
- case OVERSEERSTATUS: {
- this.handleOverseerStatus(req, rsp);
- break;
- }
- case LIST: {
- this.handleListAction(req, rsp);
- break;
- }
- case CLUSTERSTATUS: {
- this.handleClusterStatus(req, rsp);
- break;
- }
- case ADDREPLICAPROP: {
- this.handleAddReplicaProp(req, rsp);
- break;
- }
- case DELETEREPLICAPROP: {
- this.handleDeleteReplicaProp(req, rsp);
- break;
- }
- case BALANCESHARDUNIQUE: {
- this.handleBalanceShardUnique(req, rsp);
- break;
- }
- case REBALANCELEADERS: {
- this.handleBalanceLeaders(req, rsp);
- break;
- }
- default: {
- throw new RuntimeException("Unknown action: " + action);
- }
- }
-
- rsp.setHttpCaching(false);
- }
-
-
- private void handleBalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- req.getParams().required().check(COLLECTION_PROP);
-
- String collectionName = req.getParams().get(COLLECTION_PROP);
- if (StringUtils.isBlank(collectionName)) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
- }
- coreContainer.getZkController().getZkStateReader().updateClusterState(true);
- ClusterState clusterState = coreContainer.getZkController().getClusterState();
- DocCollection dc = clusterState.getCollection(collectionName);
- if (dc == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
- }
- Map<String, String> currentRequests = new HashMap<>();
- int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
- if (max <= 0) max = Integer.MAX_VALUE;
- int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
- NamedList<Object> results = new NamedList<>();
-
- boolean keepGoing = true;
- for (Slice slice : dc.getSlices()) {
- insurePreferredIsLeader(req, results, slice, currentRequests);
- if (currentRequests.size() == max) {
- log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
- keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
- if (keepGoing == false) {
- break; // If we've waited longer than specified, don't continue to wait!
- }
+ CollectionAction action = CollectionAction.get(a);
+ if (action == null)
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
+ CollectionOperation operation = CollectionOperation.get(action);
+ log.info("Invoked Collection Action :{} with params{} ", action.toLower(), req.getParamString());
+ Map<String, Object> result = operation.call(req, rsp, this);
+ if (result != null) {
+ result.put(QUEUE_OPERATION, operation.action.toLower());
+ ZkNodeProps props = new ZkNodeProps(result);
+ handleResponse(operation.action.toLower(), props, rsp, operation.timeOut);
}
- }
- if (keepGoing == true) {
- keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
- }
- if (keepGoing == true) {
- log.info("All leader reassignments completed.");
} else {
- log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
- }
-
- rsp.getValues().addAll(results);
- }
-
- private void insurePreferredIsLeader(SolrQueryRequest req, NamedList<Object> results,
- Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
- final String inactivePreferreds = "inactivePreferreds";
- final String alreadyLeaders = "alreadyLeaders";
- String collectionName = req.getParams().get(COLLECTION_PROP);
-
- for (Replica replica : slice.getReplicas()) {
- // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
- if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
- continue;
- }
- // OK, we are the preferred leader, are we the actual leader?
- if (replica.getBool(LEADER_PROP, false)) {
- //We're a preferred leader, but we're _also_ the leader, don't need to do anything.
- NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
- if (noops == null) {
- noops = new NamedList<>();
- results.add(alreadyLeaders, noops);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "success");
- res.add("msg", "Already leader");
- res.add("shard", slice.getName());
- res.add("nodeName", replica.getNodeName());
- noops.add(replica.getName(), res);
- return; // already the leader, do nothing.
- }
-
- // We're the preferred leader, but someone else is leader. Only become leader if we're active.
- if (replica.getState() != Replica.State.ACTIVE) {
- NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
- if (inactives == null) {
- inactives = new NamedList<>();
- results.add(inactivePreferreds, inactives);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "skipped");
- res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
- res.add("shard", slice.getName());
- res.add("nodeName", replica.getNodeName());
- inactives.add(replica.getName(), res);
- return; // Don't try to become the leader if we're not active!
- }
-
- // Replica is the preferred leader but not the actual leader, do something about that.
- // "Something" is
- // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
- // 2> tell the actual leader to re-queue itself.
-
- ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-
- List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
- ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
- if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
- log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
- "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
- return;
- }
-
- // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
- // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
- // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
- // watching the leader node..
-
- String firstWatcher = electionNodes.get(1);
-
- if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
- makeReplicaFirstWatcher(collectionName, slice, replica);
- }
-
- String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
- rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
- waitForNodeChange(collectionName, slice, electionNodes.get(0));
-
-
- return; // Done with this slice, skip the rest of the replicas.
- }
- }
- // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
- void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
- throws KeeperException, InterruptedException {
-
- ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
- List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
- ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
- // First, queue up the preferred leader at the head of the queue.
- int newSeq = -1;
- for (String electionNode : electionNodes) {
- if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
- String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
- rejoinElection(collectionName, slice, electionNode, coreName, true);
- newSeq = waitForNodeChange(collectionName, slice, electionNode);
- break;
- }
- }
- if (newSeq == -1) {
- return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
- }
-
- List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
- ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
-
- // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
- electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
- ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
- for (String thisNode : electionNodes) {
- if (LeaderElector.getSeq(thisNode) > newSeq) {
- break;
- }
- if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
- continue;
- }
- if (LeaderElector.getSeq(thisNode) == newSeq) {
- String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
- rejoinElection(collectionName, slice, thisNode, coreName, false);
- waitForNodeChange(collectionName, slice, thisNode);
- }
- }
- }
-
- int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
- String nodeName = LeaderElector.getNodeName(electionNode);
- int oldSeq = LeaderElector.getSeq(electionNode);
- for (int idx = 0; idx < 600; ++idx) {
- ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
- List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
- ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
- for (String testNode : electionNodes) {
- if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
- return LeaderElector.getSeq(testNode);
- }
- }
-
- Thread.sleep(100);
- }
- return -1;
- }
- private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
- boolean rejoinAtHead) throws KeeperException, InterruptedException {
- Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, slice.getName());
- propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
- propMap.put(CORE_NAME_PROP, core);
- propMap.put(NODE_NAME_PROP, replica.getName());
- propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
- propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
- propMap.put(ELECTION_NODE_PROP, electionNode);
- String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
- propMap.put(ASYNC, asyncId);
- ZkNodeProps m = new ZkNodeProps(propMap);
- SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
- handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
- }
-
- // currentAsyncIds - map of request IDs and reporting data (value)
- // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
- // waitForAll - if true, do not return until all assignments have been made.
- // results - a place to stash results for reporting back to the user.
- //
- private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
- Boolean waitForAll, NamedList<Object> results)
- throws KeeperException, InterruptedException {
-
- if (currentAsyncIds.size() == 0) return true;
-
- for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
- Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
- boolean foundChange = false;
- while (iter.hasNext()) {
- Map.Entry<String, String> pair = iter.next();
- String asyncId = pair.getKey();
- if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
- coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
- NamedList<Object> fails = (NamedList<Object>) results.get("failures");
- if (fails == null) {
- fails = new NamedList<>();
- results.add("failures", fails);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "failed");
- res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
- fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
- iter.remove();
- foundChange = true;
- } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
- coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
- NamedList<Object> successes = (NamedList<Object>) results.get("successes");
- if (successes == null) {
- successes = new NamedList<>();
- results.add("successes", successes);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "success");
- res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
- successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
- iter.remove();
- foundChange = true;
- }
- }
- // We're done if we're processing a few at a time or all requests are processed.
- if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
- return true;
- }
- Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
- }
- return false;
- }
- private void handleAddReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_VALUE_PROP);
-
-
- Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
- copyIfNotNull(req.getParams(), map, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP,
- SHARD_UNIQUE, PROPERTY_VALUE_PROP);
-
- String property = (String) map.get(PROPERTY_PROP);
- if (property.startsWith(OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
- property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
- }
-
- boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE));
-
- // Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas
- // in a slice on properties that are known to only be one-per-slice and error out if so.
- if (StringUtils.isNotBlank((String)map.get(SHARD_UNIQUE)) &&
- SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) &&
- uniquePerSlice == false) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Overseer replica property command received for property " + property +
- " with the " + SHARD_UNIQUE +
- " parameter set to something other than 'true'. No action taken.");
- }
- handleResponse(ADDREPLICAPROP.toLower(), new ZkNodeProps(map), rsp);
- }
-
- private void handleDeleteReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP);
-
- Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
- copyIfNotNull(req.getParams(), map, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
+ throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param");
- handleResponse(DELETEREPLICAPROP.toLower(), new ZkNodeProps(map), rsp);
- }
-
-
-
- private void handleBalanceShardUnique(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP);
- Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE));
- String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT);
- if (StringUtils.startsWith(prop, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
- prop = OverseerCollectionProcessor.COLL_PROP_PREFIX + prop;
}
- if (shardUnique == false &&
- SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop) == false) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
- + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " +
- " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique));
- }
-
- Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
- copyIfNotNull(req.getParams(), map, COLLECTION_PROP, PROPERTY_PROP, ONLY_ACTIVE_NODES, SHARD_UNIQUE);
-
- handleResponse(BALANCESHARDUNIQUE.toLower(), new ZkNodeProps(map), rsp);
+ rsp.setHttpCaching(false);
}
- private void handleOverseerStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- Map<String, Object> props = ZkNodeProps.makeMap(
- Overseer.QUEUE_OPERATION, OVERSEERSTATUS.toLower());
- handleResponse(OVERSEERSTATUS.toLower(), new ZkNodeProps(props), rsp);
- }
- private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- req.getParams().required().check(NAME);
- String name = req.getParams().get(NAME);
- String val = req.getParams().get(VALUE_LONG);
- coreContainer.getZkController().getZkStateReader().setClusterProperty(name, val);
- }
- static Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
- private void handleRole(CollectionAction action, SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- req.getParams().required().check("role", "node");
- Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, action.toLower());
- copyIfNotNull(req.getParams(), map,"role", "node");
- ZkNodeProps m = new ZkNodeProps(map);
- if(!KNOWN_ROLES.contains(m.getStr("role"))) throw new SolrException(ErrorCode.BAD_REQUEST,"Unknown role. Supported roles are ,"+ KNOWN_ROLES);
- handleResponse(action.toString().toLowerCase(Locale.ROOT), m, rsp);
- }
+ static final Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
public static long DEFAULT_ZK_TIMEOUT = 180*1000;
- private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
- req.getParams().required().check(REQUESTID);
-
- String requestId = req.getParams().get(REQUESTID);
-
- if (requestId.equals("-1")) {
- // Special taskId (-1), clears up the request state maps.
- if(requestId.equals("-1")) {
- coreContainer.getZkController().getOverseerCompletedMap().clear();
- coreContainer.getZkController().getOverseerFailureMap().clear();
- return;
- }
- } else {
- NamedList<Object> results = new NamedList<>();
- if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) {
- SimpleOrderedMap success = new SimpleOrderedMap();
- success.add("state", "completed");
- success.add("msg", "found " + requestId + " in completed tasks");
- results.add("status", success);
- } else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
- SimpleOrderedMap success = new SimpleOrderedMap();
- success.add("state", "failed");
- success.add("msg", "found " + requestId + " in failed tasks");
- results.add("status", success);
- } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
- SimpleOrderedMap success = new SimpleOrderedMap();
- success.add("state", "running");
- success.add("msg", "found " + requestId + " in running tasks");
- results.add("status", success);
- } else if(overseerCollectionQueueContains(requestId)){
- SimpleOrderedMap success = new SimpleOrderedMap();
- success.add("state", "submitted");
- success.add("msg", "found " + requestId + " in submitted tasks");
- results.add("status", success);
- } else {
- SimpleOrderedMap failure = new SimpleOrderedMap();
- failure.add("state", "notfound");
- failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
- results.add("status", failure);
- }
- SolrResponse response = new OverseerSolrResponse(results);
-
- rsp.getValues().addAll(response.getResponse());
- }
- }
-
- private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
- DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
- return collectionQueue.containsTaskWithRequestId(asyncId);
- }
-
- private void handleResponse(String operation, ZkNodeProps m,
+ void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp) throws KeeperException, InterruptedException {
handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
}
-
+
private void handleResponse(String operation, ZkNodeProps m,
SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
long time = System.nanoTime();
if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
-
+
String asyncId = m.getStr(ASYNC);
-
+
if(asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
}
-
+
NamedList<String> r = new NamedList<>();
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
@@ -662,16 +198,16 @@ public class CollectionsHandler extends
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
overseerCollectionQueueContains(asyncId)) {
r.add("error", "Task with the same requestid already exists.");
-
+
} else {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(ZkStateReader.toJSON(m));
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
SolrResponse response = new OverseerSolrResponse(r);
-
+
rsp.getValues().addAll(response.getResponse());
-
+
return;
}
@@ -702,139 +238,10 @@ public class CollectionsHandler extends
}
}
}
-
- private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.info("Reloading Collection : " + req.getParamString());
- String name = req.getParams().required().get(NAME);
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- RELOAD.toLower(), NAME, name);
-
- handleResponse(RELOAD.toLower(), m, rsp);
- }
-
- private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
- log.info("Syncing shard : " + req.getParamString());
- String collection = req.getParams().required().get("collection");
- String shard = req.getParams().required().get("shard");
-
- ClusterState clusterState = coreContainer.getZkController().getClusterState();
-
- ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
-
- ;
- try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) {
- client.setConnectionTimeout(15000);
- client.setSoTimeout(60000);
- RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard();
- reqSyncShard.setCollection(collection);
- reqSyncShard.setShard(shard);
- reqSyncShard.setCoreName(nodeProps.getCoreName());
- client.request(reqSyncShard);
- }
- }
-
- private void handleCreateAliasAction(SolrQueryRequest req,
- SolrQueryResponse rsp) throws Exception {
- log.info("Create alias action : " + req.getParamString());
- String name = req.getParams().required().get(NAME);
- String collections = req.getParams().required().get("collections");
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- CREATEALIAS.toLower(), NAME, name, "collections",
- collections);
-
- handleResponse(CREATEALIAS.toLower(), m, rsp);
- }
-
- private void handleDeleteAliasAction(SolrQueryRequest req,
- SolrQueryResponse rsp) throws Exception {
- log.info("Delete alias action : " + req.getParamString());
- String name = req.getParams().required().get(NAME);
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- DELETEALIAS.toLower(), NAME, name);
-
- handleResponse(DELETEALIAS.toLower(), m, rsp);
- }
-
- private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.info("Deleting Collection : " + req.getParamString());
-
- String name = req.getParams().required().get(NAME);
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- DELETE.toLower(), NAME, name);
-
- handleResponse(DELETE.toLower(), m, rsp);
- }
-
- // very simple currently, you can pass a template collection, and the new collection is created on
- // every node the template collection is on
- // there is a lot more to add - you should also be able to create with an explicit server list
- // we might also want to think about error handling (add the request to a zk queue and involve overseer?)
- // as well as specific replicas= options
- private void handleCreateAction(SolrQueryRequest req,
- SolrQueryResponse rsp) throws InterruptedException, KeeperException {
- log.info("Creating Collection : " + req.getParamString());
- String name = req.getParams().required().get(NAME);
- if (name == null) {
- log.error("Collection name is required to create a new collection");
- throw new SolrException(ErrorCode.BAD_REQUEST,
- "Collection name is required to create a new collection");
- }
-
- Map<String,Object> props = ZkNodeProps.makeMap(
- Overseer.QUEUE_OPERATION,
- CREATE.toLower(),
- "fromApi","true");
- copyIfNotNull(req.getParams(),props,
- NAME,
- REPLICATION_FACTOR,
- COLL_CONF,
- NUM_SLICES,
- MAX_SHARDS_PER_NODE,
- CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE,
- SHARDS_PROP,
- ASYNC,
- DocCollection.STATE_FORMAT,
- AUTO_ADD_REPLICAS,
- "router.");
- if(props.get(DocCollection.STATE_FORMAT) == null){
- props.put(DocCollection.STATE_FORMAT,"2");
- }
- addRuleMap(req.getParams(), props, "rule");
- addRuleMap(req.getParams(), props, "snitch");
-
- if(SYSTEM_COLL.equals(name)){
- //We must always create asystem collection with only a single shard
- props.put(NUM_SLICES,1);
- props.remove(SHARDS_PROP);
- createSysConfigSet();
-
- }
- copyPropertiesIfNotNull(req.getParams(), props);
-
- ZkNodeProps m = new ZkNodeProps(props);
- handleResponse(CREATE.toLower(), m, rsp);
- }
-
- private void addRuleMap(SolrParams params, Map<String, Object> props, String key) {
- String[] rules = params.getParams(key);
- if(rules!= null && rules.length >0){
- ArrayList<Map> l = new ArrayList<>();
- for (String rule : rules) l.add(Rule.parseRule(rule));
- props.put(key, l);
- }
- }
- private void createSysConfigSet() throws KeeperException, InterruptedException {
- SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
- createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE, null);
- createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL, null);
- createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL+"/schema.xml", BlobHandler.SCHEMA.replaceAll("'","\"").getBytes(StandardCharsets.UTF_8));
- createNodeIfNotExists(zk, ZkStateReader.CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/solrconfig.xml", BlobHandler.CONF.replaceAll("'", "\"").getBytes(StandardCharsets.UTF_8));
+ private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
+ DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
+ return collectionQueue.containsTaskWithRequestId(asyncId);
}
public static void createNodeIfNotExists(SolrZkClient zk, String path, byte[] data) throws KeeperException, InterruptedException {
@@ -848,199 +255,452 @@ public class CollectionsHandler extends
}
}
-
- private void handleRemoveReplica(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.info("Remove replica: " + req.getParamString());
- req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
- Map<String, Object> map = makeMap(QUEUE_OPERATION, DELETEREPLICA.toLower());
- copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC, ONLY_IF_DOWN);
- ZkNodeProps m = new ZkNodeProps(map);
- handleResponse(DELETEREPLICA.toLower(), m, rsp);
- }
-
-
- private void handleCreateShard(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.info("Create shard: " + req.getParamString());
- req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP);
- ClusterState clusterState = coreContainer.getZkController().getClusterState();
- if (!ImplicitDocRouter.NAME.equals(((Map) clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).get(DOC_ROUTER)).get(NAME)))
- throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
-
- Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD.toLower());
- copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, ZkStateReader.REPLICATION_FACTOR, CREATE_NODE_SET, ASYNC);
- copyPropertiesIfNotNull(req.getParams(), map);
- ZkNodeProps m = new ZkNodeProps(map);
- handleResponse(CREATESHARD.toLower(), m, rsp);
- }
-
- private static void copyIfNotNull(SolrParams params, Map<String, Object> props, String... keys) {
- ArrayList<String> prefixes = new ArrayList<>(1);
- if(keys !=null){
- for (String key : keys) {
- if(key.endsWith(".")) {
- prefixes.add(key);
- continue;
- }
- String v = params.get(key);
- if(v != null) props.put(key,v);
- }
- }
- if(prefixes.isEmpty()) return;
- Iterator<String> it = params.getParameterNamesIterator();
- String prefix = null;
- for(;it.hasNext();){
- String name = it.next();
- for (int i = 0; i < prefixes.size(); i++) {
- if(name.startsWith(prefixes.get(i))){
- String val = params.get(name);
- if(val !=null) props.put(name,val);
- }
- }
- }
-
- }
-
- private void copyPropertiesIfNotNull(SolrParams params, Map<String, Object> props) {
+ private static Map<String, Object> copyPropertiesWithPrefix(SolrParams params, Map<String, Object> props, String prefix) {
Iterator<String> iter = params.getParameterNamesIterator();
while (iter.hasNext()) {
String param = iter.next();
- if (param.startsWith(OverseerCollectionProcessor.COLL_PROP_PREFIX)) {
+ if (param.startsWith(prefix)) {
props.put(param, params.get(param));
}
}
+ return props;
}
+ public static ModifiableSolrParams params(String... params) {
+ ModifiableSolrParams msp = new ModifiableSolrParams();
+ for (int i = 0; i < params.length; i += 2) {
+ msp.add(params[i], params[i + 1]);
+ }
+ return msp;
+ }
- private void handleDeleteShardAction(SolrQueryRequest req,
- SolrQueryResponse rsp) throws InterruptedException, KeeperException {
- log.info("Deleting Shard : " + req.getParamString());
- String name = req.getParams().required().get(ZkStateReader.COLLECTION_PROP);
- String shard = req.getParams().required().get(ZkStateReader.SHARD_ID_PROP);
-
- Map<String,Object> props = new HashMap<>();
- props.put(ZkStateReader.COLLECTION_PROP, name);
- props.put(Overseer.QUEUE_OPERATION, DELETESHARD.toLower());
- props.put(ZkStateReader.SHARD_ID_PROP, shard);
+ //////////////////////// SolrInfoMBeans methods //////////////////////
- ZkNodeProps m = new ZkNodeProps(props);
- handleResponse(DELETESHARD.toLower(), m, rsp);
+ @Override
+ public String getDescription() {
+ return "Manage SolrCloud Collections";
}
- private void handleSplitShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.info("Splitting shard : " + req.getParamString());
- String name = req.getParams().required().get("collection");
- // TODO : add support for multiple shards
- String shard = req.getParams().get("shard");
- String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
- String splitKey = req.getParams().get("split.key");
+ public static final String SYSTEM_COLL = ".system";
- if (splitKey == null && shard == null) {
- throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Missing required parameter: shard");
- }
- if (splitKey != null && shard != null) {
- throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
- "Only one of 'shard' or 'split.key' should be specified");
- }
- if (splitKey != null && rangesStr != null) {
- throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
- "Only one of 'ranges' or 'split.key' should be specified");
- }
+ enum CollectionOperation {
+ /**
+ * very simple currently, you can pass a template collection, and the new collection is created on
+ * every node the template collection is on
+ * there is a lot more to add - you should also be able to create with an explicit server list
+ * we might also want to think about error handling (add the request to a zk queue and involve overseer?)
+ * as well as specific replicas= options
+ */
+ CREATE_OP(CREATE) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+ throws KeeperException, InterruptedException {
+ Map<String, Object> props = req.getParams().required().getAll(null, NAME);
+ props.put("fromApi", "true");
+ req.getParams().getAll(props,
+ NAME,
+ REPLICATION_FACTOR,
+ COLL_CONF,
+ NUM_SLICES,
+ MAX_SHARDS_PER_NODE,
+ CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE,
+ SHARDS_PROP,
+ ASYNC,
+ STATE_FORMAT,
+ AUTO_ADD_REPLICAS);
- Map<String,Object> props = new HashMap<>();
- props.put(Overseer.QUEUE_OPERATION, SPLITSHARD.toLower());
- props.put("collection", name);
- if (shard != null) {
- props.put(ZkStateReader.SHARD_ID_PROP, shard);
- }
- if (splitKey != null) {
- props.put("split.key", splitKey);
- }
- if (rangesStr != null) {
- props.put(CoreAdminParams.RANGES, rangesStr);
- }
+ if (props.get(STATE_FORMAT) == null) {
+ props.put(STATE_FORMAT, "2");
+ }
+ addRuleMap(req.getParams(), props, "rule");
+ addRuleMap(req.getParams(), props, "snitch");
- if (req.getParams().get(ASYNC) != null)
- props.put(ASYNC, req.getParams().get(ASYNC));
+ if (SYSTEM_COLL.equals(props.get(NAME))) {
+ //We must always create asystem collection with only a single shard
+ props.put(NUM_SLICES, 1);
+ props.remove(SHARDS_PROP);
+ createSysConfigSet(h.coreContainer);
- copyPropertiesIfNotNull(req.getParams(), props);
+ }
+ copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
+ return copyPropertiesWithPrefix(req.getParams(), props, "router.");
- ZkNodeProps m = new ZkNodeProps(props);
+ }
- handleResponse(SPLITSHARD.toLower(), m, rsp, DEFAULT_ZK_TIMEOUT * 5);
- }
+ private void addRuleMap(SolrParams params, Map<String, Object> props, String key) {
+ String[] rules = params.getParams(key);
+ if (rules != null && rules.length > 0) {
+ ArrayList<Map> l = new ArrayList<>();
+ for (String rule : rules) l.add(Rule.parseRule(rule));
+ props.put(key, l);
+ }
+ }
- private void handleMigrate(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.info("Migrate action invoked: " + req.getParamString());
- req.getParams().required().check("collection", "split.key", "target.collection");
- Map<String,Object> props = new HashMap<>();
- props.put(Overseer.QUEUE_OPERATION, MIGRATE.toLower());
- copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC);
- ZkNodeProps m = new ZkNodeProps(props);
- handleResponse(MIGRATE.toLower(), m, rsp, DEFAULT_ZK_TIMEOUT * 20);
- }
+ private void createSysConfigSet(CoreContainer coreContainer) throws KeeperException, InterruptedException {
+ SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
+ createNodeIfNotExists(zk, CONFIGS_ZKNODE, null);
+ createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL, null);
+ createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/schema.xml",
+ BlobHandler.SCHEMA.replaceAll("'", "\"").getBytes(UTF_8));
+ createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/solrconfig.xml",
+ BlobHandler.CONF.replaceAll("'", "\"").getBytes(UTF_8));
+ }
+ },
+ DELETE_OP(DELETE) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+ throws Exception {
+ return req.getParams().required().getAll(null, NAME);
+ }
+ },
+ RELOAD_OP(RELOAD) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+ throws Exception {
+ return req.getParams().required().getAll(null, NAME);
+
+ }
+ },
+ SYNCSHARD_OP(SYNCSHARD) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+ throws Exception {
+ String collection = req.getParams().required().get("collection");
+ String shard = req.getParams().required().get("shard");
+
+ ClusterState clusterState = h.coreContainer.getZkController().getClusterState();
+
+ ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
+
+ ;
+ try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) {
+ client.setConnectionTimeout(15000);
+ client.setSoTimeout(60000);
+ RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard();
+ reqSyncShard.setCollection(collection);
+ reqSyncShard.setShard(shard);
+ reqSyncShard.setCoreName(nodeProps.getCoreName());
+ client.request(reqSyncShard);
+ }
+ return null;
+ }
- private void handleAddReplica(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- log.info("Add replica action invoked: " + req.getParamString());
- Map<String,Object> props = new HashMap<>();
- props.put(Overseer.QUEUE_OPERATION, CollectionAction.ADDREPLICA.toString());
- copyIfNotNull(req.getParams(), props, COLLECTION_PROP, "node", SHARD_ID_PROP, ShardParams._ROUTE_,
- CoreAdminParams.NAME, CoreAdminParams.INSTANCE_DIR, CoreAdminParams.DATA_DIR, ASYNC);
- copyPropertiesIfNotNull(req.getParams(), props);
- ZkNodeProps m = new ZkNodeProps(props);
- handleResponse(CollectionAction.ADDREPLICA.toString(), m, rsp);
- }
+ },
+ CREATEALIAS_OP(CREATEALIAS) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+ throws Exception {
+ return req.getParams().required().getAll(null, NAME, "collections");
+ }
+ },
+ DELETEALIAS_OP(DELETEALIAS) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+ throws Exception {
+ return req.getParams().required().getAll(null, NAME);
+ }
+
+ },
+ SPLITSHARD_OP(SPLITSHARD, DEFAULT_ZK_TIMEOUT * 5) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+ throws Exception {
+ String name = req.getParams().required().get(COLLECTION_PROP);
+ // TODO : add support for multiple shards
+ String shard = req.getParams().get(SHARD_ID_PROP);
+ String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
+ String splitKey = req.getParams().get("split.key");
- /**
- * Handle cluster status request.
- * Can return status per specific collection/shard or per all collections.
- *
- * @param req solr request
- * @param rsp solr response
- */
- private void handleClusterStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- Map<String,Object> props = new HashMap<>();
- props.put(Overseer.QUEUE_OPERATION, CollectionAction.CLUSTERSTATUS.toLower());
- copyIfNotNull(req.getParams(), props, COLLECTION_PROP, SHARD_ID_PROP, ShardParams._ROUTE_);
- handleResponse(CollectionAction.CLUSTERSTATUS.toString(), new ZkNodeProps(props), rsp);
- }
+ if (splitKey == null && shard == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Missing required parameter: shard");
+ }
+ if (splitKey != null && shard != null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Only one of 'shard' or 'split.key' should be specified");
+ }
+ if (splitKey != null && rangesStr != null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "Only one of 'ranges' or 'split.key' should be specified");
+ }
- /**
- * Handled list collection request.
- * Do list collection request to zk host
- *
- * @param req solr request
- * @param rsp solr response
- * @throws KeeperException zk connection failed
- * @throws InterruptedException connection interrupted
- */
- private void handleListAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
- NamedList<Object> results = new NamedList<>();
- Set<String> collections = coreContainer.getZkController().getZkStateReader().getClusterState().getCollections();
- List<String> collectionList = new ArrayList<>();
- for (String collection : collections) {
- collectionList.add(collection);
- }
- results.add("collections", collectionList);
- SolrResponse response = new OverseerSolrResponse(results);
+ Map<String, Object> map = req.getParams().getAll(null,
+ COLLECTION_PROP,
+ SHARD_ID_PROP,
+ "split.key",
+ CoreAdminParams.RANGES,
+ ASYNC);
+ return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
+ }
+ },
+ DELETESHARD_OP(DELETESHARD) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+ return req.getParams().required().getAll(null,
+ COLLECTION_PROP,
+ SHARD_ID_PROP);
+ }
+ },
+ CREATESHARD_OP(CREATESHARD) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null,
+ COLLECTION_PROP,
+ SHARD_ID_PROP);
+ ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+ if (!ImplicitDocRouter.NAME.equals(((Map) clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).get(DOC_ROUTER)).get(NAME)))
+ throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections");
+ req.getParams().getAll(map,
+ REPLICATION_FACTOR,
+ CREATE_NODE_SET, ASYNC);
+ return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
+ }
+ },
+ DELETEREPLICA_OP(DELETEREPLICA) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null,
+ COLLECTION_PROP,
+ SHARD_ID_PROP,
+ REPLICA_PROP);
+ return req.getParams().getAll(map, ASYNC, ONLY_IF_DOWN);
+ }
+ },
+ MIGRATE_OP(MIGRATE) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null, COLLECTION_PROP, "split.key", "target.collection");
+ return req.getParams().getAll(map, "forward.timeout", ASYNC);
+ }
+ },
+ ADDROLE_OP(ADDROLE) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null, "role", "node");
+ if (!KNOWN_ROLES.contains(map.get("role")))
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown role. Supported roles are ," + KNOWN_ROLES);
+ return map;
+ }
+ },
+ REMOVEROLE_OP(REMOVEROLE) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null, "role", "node");
+ if (!KNOWN_ROLES.contains(map.get("role")))
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown role. Supported roles are ," + KNOWN_ROLES);
+ return map;
+ }
+ },
+ CLUSTERPROP_OP(CLUSTERPROP) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ String name = req.getParams().required().get(NAME);
+ String val = req.getParams().get(VALUE_LONG);
+ h.coreContainer.getZkController().getZkStateReader().setClusterProperty(name, val);
+ return null;
+ }
+ },
+ REQUESTSTATUS_OP(REQUESTSTATUS) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ CoreContainer coreContainer = h.coreContainer;
+ req.getParams().required().check(REQUESTID);
+
+ String requestId = req.getParams().get(REQUESTID);
+
+ if (requestId.equals("-1")) {
+ // Special taskId (-1), clears up the request state maps.
+ if (requestId.equals("-1")) {
+ coreContainer.getZkController().getOverseerCompletedMap().clear();
+ coreContainer.getZkController().getOverseerFailureMap().clear();
+ return null;
+ }
+ } else {
+ NamedList<Object> results = new NamedList<>();
+ if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "completed");
+ success.add("msg", "found " + requestId + " in completed tasks");
+ results.add("status", success);
+ } else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "failed");
+ success.add("msg", "found " + requestId + " in failed tasks");
+ results.add("status", success);
+ } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "running");
+ success.add("msg", "found " + requestId + " in running tasks");
+ results.add("status", success);
+ } else if (h.overseerCollectionQueueContains(requestId)) {
+ SimpleOrderedMap success = new SimpleOrderedMap();
+ success.add("state", "submitted");
+ success.add("msg", "found " + requestId + " in submitted tasks");
+ results.add("status", success);
+ } else {
+ SimpleOrderedMap failure = new SimpleOrderedMap();
+ failure.add("state", "notfound");
+ failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
+ results.add("status", failure);
+ }
+ SolrResponse response = new OverseerSolrResponse(results);
- rsp.getValues().addAll(response.getResponse());
- }
+ rsp.getValues().addAll(response.getResponse());
+ }
+ return null;
+ }
+ },
+ ADDREPLICA_OP(ADDREPLICA) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+ throws Exception {
+ Map<String, Object> props = req.getParams().getAll(null,
+ COLLECTION_PROP,
+ "node",
+ SHARD_ID_PROP,
+ _ROUTE_,
+ CoreAdminParams.NAME,
+ INSTANCE_DIR,
+ DATA_DIR,
+ ASYNC);
+ return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
+ }
+ },
+ OVERSEERSTATUS_OP(OVERSEERSTATUS) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ return new LinkedHashMap<>();
+ }
+ },
+
+ /**
+ * Handle list collection request.
+ * Do list collection request to zk host
+ */
+ LIST_OP(LIST) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+ NamedList<Object> results = new NamedList<>();
+ Set<String> collections = handler.coreContainer.getZkController().getZkStateReader().getClusterState().getCollections();
+ List<String> collectionList = new ArrayList<>();
+ for (String collection : collections) {
+ collectionList.add(collection);
+ }
+ results.add("collections", collectionList);
+ SolrResponse response = new OverseerSolrResponse(results);
+ rsp.getValues().addAll(response.getResponse());
+ return null;
+ }
+ },
+ /**
+ * Handle cluster status request.
+ * Can return status per specific collection/shard or per all collections.
+ */
+ CLUSTERSTATUS_OP(CLUSTERSTATUS) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+ throws KeeperException, InterruptedException {
+ return req.getParams().getAll(null,
+ COLLECTION_PROP,
+ SHARD_ID_PROP,
+ _ROUTE_);
+ }
+ },
+ ADDREPLICAPROP_OP(ADDREPLICAPROP) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null,
+ COLLECTION_PROP,
+ PROPERTY_PROP,
+ SHARD_ID_PROP,
+ REPLICA_PROP,
+ PROPERTY_VALUE_PROP);
+ req.getParams().getAll(map, SHARD_UNIQUE);
+ String property = (String) map.get(PROPERTY_PROP);
+ if (!property.startsWith(COLL_PROP_PREFIX)) {
+ property = COLL_PROP_PREFIX + property;
+ }
- public static ModifiableSolrParams params(String... params) {
- ModifiableSolrParams msp = new ModifiableSolrParams();
- for (int i=0; i<params.length; i+=2) {
- msp.add(params[i], params[i+1]);
+ boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE));
+
+ // Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas
+ // in a slice on properties that are known to only be one-per-slice and error out if so.
+ if (StringUtils.isNotBlank((String) map.get(SHARD_UNIQUE)) &&
+ SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) &&
+ uniquePerSlice == false) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Overseer replica property command received for property " + property +
+ " with the " + SHARD_UNIQUE +
+ " parameter set to something other than 'true'. No action taken.");
+ }
+ return map;
+ }
+ },
+ DELETEREPLICAPROP_OP(DELETEREPLICAPROP) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null,
+ COLLECTION_PROP,
+ PROPERTY_PROP,
+ SHARD_ID_PROP,
+ REPLICA_PROP);
+ return req.getParams().getAll(map, PROPERTY_PROP);
+ }
+ },
+ BALANCESHARDUNIQUE_OP(BALANCESHARDUNIQUE) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ Map<String, Object> map = req.getParams().required().getAll(null,
+ COLLECTION_PROP,
+ PROPERTY_PROP);
+ Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE));
+ String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT);
+ if (!StringUtils.startsWith(prop, COLL_PROP_PREFIX)) {
+ prop = COLL_PROP_PREFIX + prop;
+ }
+
+ if (!shardUnique &&
+ !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
+ + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " +
+ " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique));
+ }
+
+ return req.getParams().getAll(map, ONLY_ACTIVE_NODES, SHARD_UNIQUE);
+ }
+ },
+ REBALANCELEADERS_OP(REBALANCELEADERS) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+ new RebalanceLeaders(req,rsp,h).execute();
+ return null;
+ }
+ };
+ CollectionAction action;
+ long timeOut;
+
+ CollectionOperation(CollectionAction action) {
+ this(action, DEFAULT_ZK_TIMEOUT);
}
- return msp;
- }
- //////////////////////// SolrInfoMBeans methods //////////////////////
+ CollectionOperation(CollectionAction action, long timeOut) {
+ this.action = action;
+ this.timeOut = timeOut;
+ }
- @Override
- public String getDescription() {
- return "Manage SolrCloud Collections";
+ /**
+ * All actions must implement this method. If a non null map is returned , the action name is added to
+ * the map and sent to overseer for processing. If it returns a null, the call returns immediately
+ */
+ abstract Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception;
+
+ public static CollectionOperation get(CollectionAction action) {
+ for (CollectionOperation op : values()) {
+ if (op.action == action) return op;
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "No such action" + action);
+ }
}
- public static final String SYSTEM_COLL =".system";
}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java?rev=1679397&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java Thu May 14 16:25:52 2015
@@ -0,0 +1,327 @@
+package org.apache.solr.handler.admin;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.cloud.LeaderElector;
+import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.zookeeper.KeeperException;
+
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+
+class RebalanceLeaders {
+ final SolrQueryRequest req;
+ final SolrQueryResponse rsp;
+ final CollectionsHandler collectionsHandler;
+ final CoreContainer coreContainer;
+
+ RebalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler collectionsHandler) {
+ this.req = req;
+ this.rsp = rsp;
+ this.collectionsHandler = collectionsHandler;
+ coreContainer = collectionsHandler.getCoreContainer();
+ }
+
+ void execute() throws KeeperException, InterruptedException {
+ req.getParams().required().check(COLLECTION_PROP);
+
+ String collectionName = req.getParams().get(COLLECTION_PROP);
+ if (StringUtils.isBlank(collectionName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
+ }
+ coreContainer.getZkController().getZkStateReader().updateClusterState(true);
+ ClusterState clusterState = coreContainer.getZkController().getClusterState();
+ DocCollection dc = clusterState.getCollection(collectionName);
+ if (dc == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+ }
+ Map<String, String> currentRequests = new HashMap<>();
+ int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
+ if (max <= 0) max = Integer.MAX_VALUE;
+ int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
+ NamedList<Object> results = new NamedList<>();
+
+ boolean keepGoing = true;
+ for (Slice slice : dc.getSlices()) {
+ insurePreferredIsLeader(results, slice, currentRequests);
+ if (currentRequests.size() == max) {
+ CollectionsHandler.log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
+ keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
+ if (keepGoing == false) {
+ break; // If we've waited longer than specified, don't continue to wait!
+ }
+ }
+ }
+ if (keepGoing == true) {
+ keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
+ }
+ if (keepGoing == true) {
+ CollectionsHandler.log.info("All leader reassignments completed.");
+ } else {
+ CollectionsHandler.log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
+ }
+
+ rsp.getValues().addAll(results);
+ }
+
+ private void insurePreferredIsLeader(NamedList<Object> results,
+ Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
+ final String inactivePreferreds = "inactivePreferreds";
+ final String alreadyLeaders = "alreadyLeaders";
+ String collectionName = req.getParams().get(COLLECTION_PROP);
+
+ for (Replica replica : slice.getReplicas()) {
+ // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
+ if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
+ continue;
+ }
+ // OK, we are the preferred leader, are we the actual leader?
+ if (replica.getBool(LEADER_PROP, false)) {
+ //We're a preferred leader, but we're _also_ the leader, don't need to do anything.
+ NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
+ if (noops == null) {
+ noops = new NamedList<>();
+ results.add(alreadyLeaders, noops);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "success");
+ res.add("msg", "Already leader");
+ res.add("shard", slice.getName());
+ res.add("nodeName", replica.getNodeName());
+ noops.add(replica.getName(), res);
+ return; // already the leader, do nothing.
+ }
+
+ // We're the preferred leader, but someone else is leader. Only become leader if we're active.
+ if (replica.getState() != Replica.State.ACTIVE) {
+ NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
+ if (inactives == null) {
+ inactives = new NamedList<>();
+ results.add(inactivePreferreds, inactives);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "skipped");
+ res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
+ res.add("shard", slice.getName());
+ res.add("nodeName", replica.getNodeName());
+ inactives.add(replica.getName(), res);
+ return; // Don't try to become the leader if we're not active!
+ }
+
+ // Replica is the preferred leader but not the actual leader, do something about that.
+ // "Something" is
+ // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
+ // 2> tell the actual leader to re-queue itself.
+
+ ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+
+ List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+ if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
+ CollectionsHandler.log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
+ "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
+ return;
+ }
+
+ // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
+ // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
+ // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
+ // watching the leader node..
+
+ String firstWatcher = electionNodes.get(1);
+
+ if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
+ makeReplicaFirstWatcher(collectionName, slice, replica);
+ }
+
+ String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
+ rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
+ waitForNodeChange(collectionName, slice, electionNodes.get(0));
+
+
+ return; // Done with this slice, skip the rest of the replicas.
+ }
+ }
+ // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
+ void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
+ throws KeeperException, InterruptedException {
+
+ ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+ List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+ // First, queue up the preferred leader at the head of the queue.
+ int newSeq = -1;
+ for (String electionNode : electionNodes) {
+ if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
+ String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
+ rejoinElection(collectionName, slice, electionNode, coreName, true);
+ newSeq = waitForNodeChange(collectionName, slice, electionNode);
+ break;
+ }
+ }
+ if (newSeq == -1) {
+ return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
+ }
+
+ List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+
+ // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
+ electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+ for (String thisNode : electionNodes) {
+ if (LeaderElector.getSeq(thisNode) > newSeq) {
+ break;
+ }
+ if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
+ continue;
+ }
+ if (LeaderElector.getSeq(thisNode) == newSeq) {
+ String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
+ rejoinElection(collectionName, slice, thisNode, coreName, false);
+ waitForNodeChange(collectionName, slice, thisNode);
+ }
+ }
+ }
+
+ int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
+ String nodeName = LeaderElector.getNodeName(electionNode);
+ int oldSeq = LeaderElector.getSeq(electionNode);
+ for (int idx = 0; idx < 600; ++idx) {
+ ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+ List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+ for (String testNode : electionNodes) {
+ if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
+ return LeaderElector.getSeq(testNode);
+ }
+ }
+
+ Thread.sleep(100);
+ }
+ return -1;
+ }
+ private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
+ boolean rejoinAtHead) throws KeeperException, InterruptedException {
+ Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(COLLECTION_PROP, collectionName);
+ propMap.put(SHARD_ID_PROP, slice.getName());
+ propMap.put(QUEUE_OPERATION, REBALANCELEADERS.toLower());
+ propMap.put(CORE_NAME_PROP, core);
+ propMap.put(NODE_NAME_PROP, replica.getName());
+ propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
+ propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
+ propMap.put(ELECTION_NODE_PROP, electionNode);
+ String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
+ propMap.put(ASYNC, asyncId);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
+ collectionsHandler.handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
+ }
+
+ // currentAsyncIds - map of request IDs and reporting data (value)
+ // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
+ // waitForAll - if true, do not return until all assignments have been made.
+ // results - a place to stash results for reporting back to the user.
+ //
+ private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
+ Boolean waitForAll, NamedList<Object> results)
+ throws KeeperException, InterruptedException {
+
+ if (currentAsyncIds.size() == 0) return true;
+
+ for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
+ Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
+ boolean foundChange = false;
+ while (iter.hasNext()) {
+ Map.Entry<String, String> pair = iter.next();
+ String asyncId = pair.getKey();
+ if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
+ coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
+ NamedList<Object> fails = (NamedList<Object>) results.get("failures");
+ if (fails == null) {
+ fails = new NamedList<>();
+ results.add("failures", fails);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "failed");
+ res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
+ fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+ iter.remove();
+ foundChange = true;
+ } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
+ coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
+ NamedList<Object> successes = (NamedList<Object>) results.get("successes");
+ if (successes == null) {
+ successes = new NamedList<>();
+ results.add("successes", successes);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "success");
+ res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
+ successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+ iter.remove();
+ foundChange = true;
+ }
+ }
+ // We're done if we're processing a few at a time or all requests are processed.
+ if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
+ return true;
+ }
+ Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
+ }
+ return false;
+ }
+
+
+}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java?rev=1679397&r1=1679396&r2=1679397&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java Thu May 14 16:25:52 2015
@@ -25,6 +25,7 @@ import org.apache.solr.common.util.StrUt
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -48,14 +49,14 @@ public abstract class SolrParams impleme
String val = get(param);
return val==null ? def : val;
}
-
+
/** returns a RequiredSolrParams wrapping this */
public RequiredSolrParams required()
{
// TODO? should we want to stash a reference?
return new RequiredSolrParams(this);
}
-
+
protected String fpname(String field, String param) {
return "f."+field+'.'+param;
}
@@ -75,7 +76,7 @@ public abstract class SolrParams impleme
String val = get(fpname(field,param));
return val!=null ? val : get(param, def);
}
-
+
/** returns the String values of the field parameter, "f.field.param", or
* the values for "param" if that is not set.
*/
@@ -95,15 +96,15 @@ public abstract class SolrParams impleme
String val = get(param);
return val==null ? def : StrUtils.parseBool(val);
}
-
- /** Returns the Boolean value of the field param,
+
+ /** Returns the Boolean value of the field param,
or the value for param, or null if neither is set. */
public Boolean getFieldBool(String field, String param) {
String val = getFieldParam(field, param);
return val==null ? null : StrUtils.parseBool(val);
}
-
- /** Returns the boolean value of the field param,
+
+ /** Returns the boolean value of the field param,
or the value for param, or def if neither is set. */
public boolean getFieldBool(String field, String param, boolean def) {
String val = getFieldParam(field, param);
@@ -165,8 +166,8 @@ public abstract class SolrParams impleme
/**
- * @return The int value of the field param, or the value for param
- * or <code>null</code> if neither is set.
+ * @return The int value of the field param, or the value for param
+ * or <code>null</code> if neither is set.
**/
public Integer getFieldInt(String field, String param) {
String val = getFieldParam(field, param);
@@ -177,8 +178,8 @@ public abstract class SolrParams impleme
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, ex.getMessage(), ex );
}
}
-
- /** Returns the int value of the field param,
+
+ /** Returns the int value of the field param,
or the value for param, or def if neither is set. */
public int getFieldInt(String field, String param, int def) {
String val = getFieldParam(field, param);
@@ -323,7 +324,7 @@ public abstract class SolrParams impleme
// always use MultiMap for easier processing further down the chain
return new MultiMapSolrParams(toMultiMap(params));
}
-
+
/** Create filtered SolrParams. */
public SolrParams toFilteredSolrParams(List<String> names) {
NamedList<String> nl = new NamedList<>();
@@ -338,11 +339,11 @@ public abstract class SolrParams impleme
}
return toSolrParams(nl);
}
-
+
/** Convert this to a NamedList */
public NamedList<Object> toNamedList() {
final SimpleOrderedMap<Object> result = new SimpleOrderedMap<>();
-
+
for(Iterator<String> it=getParameterNamesIterator(); it.hasNext(); ) {
final String name = it.next();
final String [] values = getParams(name);
@@ -355,4 +356,22 @@ public abstract class SolrParams impleme
}
return result;
}
+
+ /**Copy all params to the given map or if the given map is null
+ * create a new one
+ */
+ public Map<String, Object> getAll(Map<String, Object> sink, String... params){
+ if(sink == null) sink = new LinkedHashMap<>();
+ for (String param : params) {
+ String[] v = getParams(param);
+ if(v != null && v.length>0 ) {
+ if(v.length == 1) {
+ sink.put(param, v[0]);
+ } else {
+ sink.put(param,v);
+ }
+ }
+ }
+ return sink;
+ }
}