You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2014/10/17 16:50:14 UTC
svn commit: r1632594 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
solrj/src/java/org/apache/solr/comm...
Author: erick
Date: Fri Oct 17 14:50:14 2014
New Revision: 1632594
URL: http://svn.apache.org/r1632594
Log:
CollectionsAPI call REBALANCELEADERS
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Oct 17 14:50:14 2014
@@ -178,6 +178,9 @@ New Features
* SOLR-4715: Add CloudSolrServer constructors which accept a HttpClient instance.
(Hardik Upadhyay, Shawn Heisey, shalin)
+* SOLR-6517: CollectionsAPI call REBALANCELEADERS. Used to balance leaders
+ across nodes for a particular collection
+
Bug Fixes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Oct 17 14:50:14 2014
@@ -212,7 +212,7 @@ final class ShardLeaderElectionContext e
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
if (!weAreReplacement) {
- waitForReplicasToComeUp(weAreReplacement, leaderVoteWait);
+ waitForReplicasToComeUp(leaderVoteWait);
}
try (SolrCore core = cc.getCore(coreName)) {
@@ -226,7 +226,7 @@ final class ShardLeaderElectionContext e
// should I be leader?
if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
- rejoinLeaderElection(leaderSeqPath, core);
+ rejoinLeaderElection(core);
return;
}
@@ -297,7 +297,7 @@ final class ShardLeaderElectionContext e
}
}
if (!success) {
- rejoinLeaderElection(leaderSeqPath, core);
+ rejoinLeaderElection(core);
return;
}
@@ -323,7 +323,7 @@ final class ShardLeaderElectionContext e
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
// we could not publish ourselves as leader - try and rejoin election
- rejoinLeaderElection(leaderSeqPath, core);
+ rejoinLeaderElection(core);
}
}
@@ -401,7 +401,7 @@ final class ShardLeaderElectionContext e
} // core gets closed automagically
}
- private void waitForReplicasToComeUp(boolean weAreReplacement, int timeoutms) throws InterruptedException {
+ private void waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
@@ -448,11 +448,11 @@ final class ShardLeaderElectionContext e
}
}
- private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
+ private void rejoinLeaderElection(SolrCore core)
throws InterruptedException, KeeperException, IOException {
// remove our ephemeral and re join the election
if (cc.isShutDown()) {
- log.info("Not rejoining election because CoreContainer is close");
+ log.info("Not rejoining election because CoreContainer is closed");
return;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Fri Oct 17 14:50:14 2014
@@ -106,7 +106,6 @@ public class LeaderElector {
return;
}
// first we delete the node advertising the old leader in case the ephem is still there
- // first we delete the node advertising the old leader in case the ephem is still there
try {
zkClient.delete(context.leaderPath, -1, true);
}catch (KeeperException.NoNodeException nne){
@@ -244,7 +243,7 @@ public class LeaderElector {
try {
if(joinAtHead){
log.info("node {} Trying to join election at the head ", id);
- List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient);
+ List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
if(nodes.size() <2){
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Oct 17 14:50:14 2014
@@ -130,7 +130,9 @@ public class Overseer implements Closeab
static enum LeaderStatus {DONT_KNOW, NO, YES}
- public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of("property.preferredleader");
+ public static final String preferredLeaderProp = COLL_PROP_PREFIX + "preferredleader";
+
+ public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of(preferredLeaderProp);
private long lastUpdatedTime = 0;
@@ -1169,7 +1171,7 @@ public class Overseer implements Closeab
return null;
}
- ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
+ private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
// System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
// System.out.println("Updating slice:" + slice);
DocCollection newCollection = null;
@@ -1396,7 +1398,6 @@ public class Overseer implements Closeab
}
}
-
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
private class ExclusiveSliceProperty {
private ClusterStateUpdater updater;
@@ -1698,6 +1699,7 @@ public class Overseer implements Closeab
this.replica = replica;
}
}
+
static void getShardNames(Integer numShards, List<String> shardNames) {
if(numShards == null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param");
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Oct 17 14:50:14 2014
@@ -18,7 +18,9 @@ package org.apache.solr.cloud;
*/
import static org.apache.solr.cloud.Assign.getNodesForNewShard;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@@ -31,9 +33,9 @@ import static org.apache.solr.common.par
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import java.io.Closeable;
import java.io.IOException;
@@ -441,7 +443,7 @@ public class OverseerCollectionProcessor
String ldr = getLeaderNode(zk);
if(overseerDesignates.contains(ldr)) return;
log.info("prioritizing overseer nodes at {} overseer designates are {}", myId, overseerDesignates);
- List<String> electionNodes = getSortedElectionNodes(zk);
+ List<String> electionNodes = getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
if(electionNodes.size()<2) return;
log.info("sorted nodes {}", electionNodes);
@@ -484,10 +486,10 @@ public class OverseerCollectionProcessor
return nodeNames;
}
- public static List<String> getSortedElectionNodes(SolrZkClient zk) throws KeeperException, InterruptedException {
+ public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
List<String> children = null;
try {
- children = zk.getChildren(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE, null, true);
+ children = zk.getChildren(path, null, true);
LeaderElector.sortSeqs(children);
return children;
} catch (Exception e) {
@@ -651,6 +653,9 @@ public class OverseerCollectionProcessor
case BALANCESLICEUNIQUE:
balanceProperty(message);
break;
+ case REBALANCELEADERS:
+ processAssignLeaders(message);
+ break;
default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
@@ -677,6 +682,32 @@ public class OverseerCollectionProcessor
}
@SuppressWarnings("unchecked")
+ // re-purpose BALANCELEADERS to reassign a single leader over here
+ private void processAssignLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String shardId = message.getStr(SHARD_ID_PROP);
+ String baseURL = message.getStr(BASE_URL_PROP);
+ String coreName = message.getStr(CORE_NAME_PROP);
+
+ if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(shardId) || StringUtils.isBlank(baseURL) ||
+ StringUtils.isBlank(coreName)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "The '%s', '%s', '%s' and '%s' parameters are required when assigning a leader",
+ COLLECTION_PROP, SHARD_ID_PROP, BASE_URL_PROP, CORE_NAME_PROP));
+ }
+ SolrZkClient zkClient = zkStateReader.getZkClient();
+ DistributedQueue inQueue = Overseer.getInQueue(zkClient);
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower());
+ propMap.put(COLLECTION_PROP, collectionName);
+ propMap.put(SHARD_ID_PROP, shardId);
+ propMap.put(BASE_URL_PROP, baseURL);
+ propMap.put(CORE_NAME_PROP, coreName);
+ inQueue.offer(zkStateReader.toJSON(propMap));
+ }
+
+
+ @SuppressWarnings("unchecked")
private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
@@ -684,7 +715,7 @@ public class OverseerCollectionProcessor
StringUtils.isBlank(message.getStr(PROPERTY_PROP)) ||
StringUtils.isBlank(message.getStr(PROPERTY_VALUE_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete' operations",
+ String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP));
}
SolrZkClient zkClient = zkStateReader.getZkClient();
@@ -702,7 +733,7 @@ public class OverseerCollectionProcessor
StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
throw new SolrException(ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete' operations",
+ String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP));
}
SolrZkClient zkClient = zkStateReader.getZkClient();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Oct 17 14:50:14 2014
@@ -834,7 +834,13 @@ public final class ZkController {
ZkNodeProps leaderProps = new ZkNodeProps(props);
try {
- joinElection(desc, afterExpiration);
+ // If we're a preferred leader, insert ourselves at the head of the queue
+ boolean joinAtHead = false;
+ Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
+ if (replica != null) {
+ joinAtHead = replica.getBool(Overseer.preferredLeaderProp, false);
+ }
+ joinElection(desc, afterExpiration, joinAtHead);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -988,7 +994,8 @@ public final class ZkController {
}
- private void joinElection(CoreDescriptor cd, boolean afterExpiration) throws InterruptedException, KeeperException, IOException {
+ private void joinElection(CoreDescriptor cd, boolean afterExpiration, boolean joinAtHead)
+ throws InterruptedException, KeeperException, IOException {
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
@@ -1018,7 +1025,7 @@ public final class ZkController {
leaderElector.setup(context);
electionContexts.put(contextKey, context);
- leaderElector.joinElection(context, false);
+ leaderElector.joinElection(context, false, joinAtHead);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Oct 17 14:50:14 2014
@@ -30,16 +30,23 @@ import static org.apache.solr.cloud.Over
import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESLICEUNIQUE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESLICEUNIQUE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
@@ -51,6 +58,7 @@ import static org.apache.solr.common.par
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
@@ -80,6 +88,8 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -252,6 +262,10 @@ public class CollectionsHandler extends
this.handleBalanceSliceUnique(req, rsp);
break;
}
+ case REBALANCELEADERS: {
+ this.handleBalanceLeaders(req, rsp);
+ break;
+ }
default: {
throw new RuntimeException("Unknown action: " + action);
}
@@ -260,6 +274,156 @@ public class CollectionsHandler extends
rsp.setHttpCaching(false);
}
+
+ private void handleBalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+ req.getParams().required().check(COLLECTION_PROP);
+
+ String collectionName = req.getParams().get(COLLECTION_PROP);
+ if (StringUtils.isBlank(collectionName)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
+ }
+ coreContainer.getZkController().getZkStateReader().updateClusterState(true);
+ ClusterState clusterState = coreContainer.getZkController().getClusterState();
+ DocCollection dc = clusterState.getCollection(collectionName);
+ if (dc == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+ }
+ Map<String, String> current = new HashMap<>();
+ int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
+ if (max <= 0) max = Integer.MAX_VALUE;
+ int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
+ NamedList<Object> results = new NamedList<>();
+ SolrQueryResponse rspIgnore = new SolrQueryResponse();
+ final String inactivePreferreds = "inactivePreferreds";
+ final String alreadyLeaders = "alreadyLeaders";
+ boolean keepGoing = true;
+ for (Slice slice : dc.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
+ if (replica.getBool(Overseer.preferredLeaderProp, false) == false) {
+ continue;
+ }
+ if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
+ NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
+ if (inactives == null) {
+ inactives = new NamedList<>();
+ results.add(inactivePreferreds, inactives);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "skipped");
+ res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
+ res.add("nodeName", replica.getNodeName());
+ inactives.add(replica.getName(), res);
+ break; // Don't try to assign if we're not active!
+ } // OK, we're the one, get in the queue to become the leader.
+ if (replica.getBool(LEADER_PROP, false)) {
+ NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
+ if (noops == null) {
+ noops = new NamedList<>();
+ results.add(alreadyLeaders, noops);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "success");
+ res.add("msg", "Already leader");
+ res.add("nodeName", replica.getNodeName());
+ noops.add(replica.getName(), res);
+ break; // already the leader, do nothing.
+ }
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
+ propMap.put(COLLECTION_PROP, collectionName);
+ propMap.put(SHARD_ID_PROP, slice.getName());
+ propMap.put(BASE_URL_PROP, replica.get(BASE_URL_PROP));
+
+ String coreName = (String) replica.get(CORE_NAME_PROP);
+ // Put it in the waiting list.
+ String asyncId = REBALANCELEADERS.toLower() + "_" + coreName;
+ current.put(asyncId, String.format(Locale.ROOT, "Collection: '%s', Shard: '%s', Core: '%s', BaseUrl: '%s'",
+ collectionName, slice.getName(), coreName, replica.get(BASE_URL_PROP)));
+
+ propMap.put(CORE_NAME_PROP, coreName);
+ propMap.put(ASYNC, asyncId);
+
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ log.info("Queueing collection '" + collectionName + "' slice '" + slice.getName() + "' replica '" +
+ coreName + "' to become leader.");
+ handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
+ break; // Done with this slice, skip the rest of the replicas.
+ }
+ if (current.size() == max) {
+ log.info("Queued " + max + " leader reassgnments, waiting for some to complete.");
+ keepGoing = waitForLeaderChange(current, maxWaitSecs, false, results);
+ if (keepGoing == false) {
+ break; // If we've waited longer than specified, don't continue to wait!
+ }
+ }
+ }
+ if (keepGoing == true) {
+ keepGoing = waitForLeaderChange(current, maxWaitSecs, true, results);
+ }
+ if (keepGoing == true) {
+ log.info("All leader reassignments completed.");
+ } else {
+ log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
+ }
+
+ rsp.getValues().addAll(results);
+ }
+
+ // currentAsyncIds - map of request IDs and reporting data (value)
+ // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
+ // waitForAll - if true, do not return until all assignments have been made.
+ // results - a place to stash results for reporting back to the user.
+ //
+ private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
+ Boolean waitForAll, NamedList<Object> results)
+ throws KeeperException, InterruptedException {
+
+ if (currentAsyncIds.size() == 0) return true;
+
+ for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
+ Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
+ boolean foundChange = false;
+ while (iter.hasNext()) {
+ Map.Entry<String, String> pair = iter.next();
+ String asyncId = pair.getKey();
+ if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
+ coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
+ NamedList<Object> fails = (NamedList<Object>) results.get("failures");
+ if (fails == null) {
+ fails = new NamedList<>();
+ results.add("failures", fails);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "failed");
+ res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
+ fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+ iter.remove();
+ foundChange = true;
+ } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
+ coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
+ NamedList<Object> successes = (NamedList<Object>) results.get("successes");
+ if (successes == null) {
+ successes = new NamedList<>();
+ results.add("successes", successes);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "success");
+ res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
+ successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+ iter.remove();
+ foundChange = true;
+ }
+ }
+ // We're done if we're processing a few at a time or all requests are processed.
+ if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
+ return true;
+ }
+ Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
+ }
+ return false;
+ }
private void handleAddReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_VALUE_PROP);
@@ -425,7 +589,7 @@ public class CollectionsHandler extends
}
NamedList<String> r = new NamedList<>();
-
+
if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java Fri Oct 17 14:50:14 2014
@@ -197,7 +197,9 @@ public class OverseerRolesTest extends
assertNotNull("Could not find a jetty2 kill", leaderJetty);
log.info("leader node {}", leaderJetty.getBaseUrl());
- log.info ("current election Queue", OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient()));
+ log.info ("current election Queue",
+ OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient(),
+ OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
ChaosMonkey.stop(leaderJetty);
timeout = System.currentTimeMillis() + 10000;
leaderchanged = false;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java Fri Oct 17 14:50:14 2014
@@ -106,7 +106,9 @@ public class RollingRestartTest extends
if (!success) {
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
if (leader == null)
- log.error("NOOVERSEER election queue is :" + OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient()));
+ log.error("NOOVERSEER election queue is :" +
+ OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
+ OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
fail("No overseer designate as leader found after restart #" + (i + 1) + ": " + leader);
}
}
@@ -115,7 +117,9 @@ public class RollingRestartTest extends
if (!success) {
leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
if (leader == null)
- log.error("NOOVERSEER election queue is :" + OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient()));
+ log.error("NOOVERSEER election queue is :" +
+ OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
+ OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
fail("No overseer leader found after restart #" + (i + 1) + ": " + leader);
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java Fri Oct 17 14:50:14 2014
@@ -29,10 +29,13 @@ import org.apache.solr.client.solrj.Solr
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
import org.junit.Before;
@Slow
@@ -61,7 +64,7 @@ public class TestReplicaProperties exten
// shards, replicationfactor, maxreplicaspernode
int shards = random().nextInt(7);
if (shards < 2) shards = 2;
- int rFactor = random().nextInt(3);
+ int rFactor = random().nextInt(4);
if (rFactor < 2) rFactor = 2;
createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
} finally {
@@ -186,11 +189,56 @@ public class TestReplicaProperties exten
verifyPropertyVal(client, COLLECTION_NAME,
c1_s1_r2, "property.bogus1", "whatever");
+ // At this point we've assigned a preferred leader. Make it happen and check that all the nodes that are
+ // leaders _also_ have the preferredLeader property set.
+
+
+ doPropertyAction(client,
+ "action", CollectionParams.CollectionAction.REBALANCELEADERS.toString(),
+ "collection", COLLECTION_NAME);
+
+ verifyLeaderAssignment(client, COLLECTION_NAME);
+
} finally {
client.shutdown();
}
}
+ private void verifyLeaderAssignment(CloudSolrServer client, String collectionName)
+ throws InterruptedException, KeeperException {
+ String lastFailMsg = "";
+ for (int idx = 0; idx < 300; ++idx) { // Keep trying while Overseer writes the ZK state for up to 30 seconds.
+ lastFailMsg = "";
+ client.getZkStateReader().updateClusterState(true);
+ ClusterState clusterState = client.getZkStateReader().getClusterState();
+ for (Slice slice : clusterState.getSlices(collectionName)) {
+ Boolean foundLeader = false;
+ Boolean foundPreferred = false;
+ for (Replica replica : slice.getReplicas()) {
+ Boolean isLeader = replica.getBool("leader", false);
+ Boolean isPreferred = replica.getBool("property.preferredleader", false);
+ if (isLeader != isPreferred) {
+ lastFailMsg = "Replica should NOT have preferredLeader != leader. Preferred: " + isPreferred.toString() +
+ " leader is " + isLeader.toString();
+ }
+ if (foundLeader && isLeader) {
+ lastFailMsg = "There should only be a single leader in _any_ shard! Replica " + replica.getName() +
+ " is the second leader in slice " + slice.getName();
+ }
+ if (foundPreferred && isPreferred) {
+ lastFailMsg = "There should only be a single preferredLeader in _any_ shard! Replica " + replica.getName() +
+ " is the second preferredLeader in slice " + slice.getName();
+ }
+ foundLeader = foundLeader ? foundLeader : isLeader;
+ foundPreferred = foundPreferred ? foundPreferred : isPreferred;
+ }
+ }
+ if (lastFailMsg.length() == 0) return;
+ Thread.sleep(100);
+ }
+ fail(lastFailMsg);
+ }
+
private void addProperty(CloudSolrServer client, String... paramsIn) throws IOException, SolrServerException {
assertTrue("paramsIn must be an even multiple of 2, it is: " + paramsIn.length, (paramsIn.length % 2) == 0);
ModifiableSolrParams params = new ModifiableSolrParams();
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Oct 17 14:50:14 2014
@@ -71,7 +71,8 @@ public class ZkStateReader implements Cl
public static final String LEADER_PROP = "leader";
public static final String PROPERTY_PROP = "property";
public static final String PROPERTY_VALUE_PROP = "property.value";
-
+ public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
+ public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
public static final String COLLECTIONS_ZKNODE = "/collections";
public static final String LIVE_NODES_ZKNODE = "/live_nodes";
public static final String ALIASES = "/aliases.json";
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Fri Oct 17 14:50:14 2014
@@ -49,7 +49,8 @@ public interface CollectionParams
CLUSTERSTATUS,
ADDREPLICAPROP,
DELETEREPLICAPROP,
- BALANCESLICEUNIQUE;
+ BALANCESLICEUNIQUE,
+ REBALANCELEADERS;
public static CollectionAction get( String p )
{