You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2014/12/24 23:55:36 UTC
svn commit: r1647857 - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
solrj/src/java/org/apache/solr/common/...
Author: erick
Date: Wed Dec 24 22:55:36 2014
New Revision: 1647857
URL: http://svn.apache.org/r1647857
Log:
SOLR=6691: REBALANCELEADERS needs to change the leader election queue. Going to let this bake in trunk until 5.0 is cut
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Wed Dec 24 22:55:36 2014
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -86,7 +87,7 @@ public class LeaderElector {
*
* @param replacement has someone else been the leader already?
*/
- private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
+ private void checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException {
context.checkIfIamLeaderFired();
// get all other numbers...
@@ -99,10 +100,44 @@ public class LeaderElector {
log.warn("Our node is no longer in line to be leader");
return;
}
+ // We can't really rely on the sequence number stored in the old watcher, it may be stale, thus this check.
+
+ int seq = -1;
+
+ // See if we've already been re-added, and this is an old context. In which case, use our current sequence number.
+ String newLeaderSeq = "";
+ for (String elec : seqs) {
+ if (getNodeName(elec).equals(getNodeName(context.leaderSeqPath)) && seq < getSeq(elec)) {
+ seq = getSeq(elec); // so use the current sequence number.
+ newLeaderSeq = elec;
+ break;
+ }
+ }
+
+ // Now, if we've been re-added, presumably we've also set up watchers and all that kind of thing, so we're done
+ if (StringUtils.isNotBlank(newLeaderSeq) && seq > getSeq(context.leaderSeqPath)) {
+ log.info("Node " + context.leaderSeqPath + " already in queue as " + newLeaderSeq + " nothing to do.");
+ return;
+ }
+
+ // Fallback in case we're all coming in here fresh and there is no node for this core already in the election queue.
+ if (seq == -1) {
+ seq = getSeq(context.leaderSeqPath);
+ }
+
if (seq <= intSeqs.get(0)) {
- if(seq == intSeqs.get(0) && !context.leaderSeqPath.equals(holdElectionPath+"/"+seqs.get(0)) ) {//somebody else already became the leader with the same sequence id , not me
- log.info("was going be leader {} , seq(0) {}",context.leaderSeqPath,holdElectionPath+"/"+seqs.get(0));//but someone else jumped the line
- retryElection(context,false);//join at the tail again
+ if (seq == intSeqs.get(0) && !context.leaderSeqPath.equals(holdElectionPath + "/" + seqs.get(0))) {//somebody else already became the leader with the same sequence id , not me
+ log.info("was going to be leader {} , seq(0) {}", context.leaderSeqPath, holdElectionPath + "/" + seqs.get(0));//but someone else jumped the line
+
+ // The problem is that deleting the ZK node that's watched by others
+ // results in an unpredictable sequencing of the events and sometime the context that comes in for checking
+ // this happens to be after the node has already taken over leadership. So just leave out of here.
+ // This caused one of the tests to fail on having two nodes with the same name in the queue. I'm not sure
+ // the assumption that this is a bad state is valid.
+ if (getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(0)))) {
+ return;
+ }
+ retryElection(context, false);//join at the tail again
return;
}
// first we delete the node advertising the old leader in case the ephem is still there
@@ -129,21 +164,22 @@ public class LeaderElector {
}
} else {
// I am not the leader - watch the node below me
- int i = 1;
- for (; i < intSeqs.size(); i++) {
- int s = intSeqs.get(i);
- if (seq < s) {
- // we found who we come before - watch the guy in front
+ int toWatch = -1;
+ for (int idx = 0; idx < intSeqs.size(); idx++) {
+ if (intSeqs.get(idx) < seq && ! getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(idx)))) {
+ toWatch = idx;
+ }
+ if (intSeqs.get(idx) >= seq) {
break;
}
}
- int index = i - 2;
- if (index < 0) {
+ if (toWatch < 0) {
log.warn("Our node is no longer in line to be leader");
return;
}
try {
- String watchedNode = holdElectionPath + "/" + seqs.get(index);
+ String watchedNode = holdElectionPath + "/" + seqs.get(toWatch);
+
zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath , watchedNode,seq, context) , null, true);
} catch (KeeperException.SessionExpiredException e) {
throw e;
@@ -151,7 +187,7 @@ public class LeaderElector {
log.warn("Failed setting watch", e);
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
- checkIfIamLeader(seq, context, true);
+ checkIfIamLeader(context, true);
}
}
}
@@ -309,15 +345,13 @@ public class LeaderElector {
}
}
}
- int seq = getSeq(leaderSeqPath);
- checkIfIamLeader(seq, context, replacement);
-
- return seq;
+ checkIfIamLeader(context, replacement);
+
+ return getSeq(context.leaderSeqPath);
}
private class ElectionWatcher implements Watcher {
final String myNode,watchedNode;
- final int seq;
final ElectionContext context;
private boolean canceled = false;
@@ -325,11 +359,10 @@ public class LeaderElector {
private ElectionWatcher(String myNode, String watchedNode, int seq, ElectionContext context) {
this.myNode = myNode;
this.watchedNode = watchedNode;
- this.seq = seq;
this.context = context;
}
- void cancel(String leaderSeqPath){
+ void cancel() {
canceled = true;
}
@@ -354,7 +387,7 @@ public class LeaderElector {
}
try {
// am I the next leader?
- checkIfIamLeader(seq, context, true);
+ checkIfIamLeader(context, true);
} catch (Exception e) {
log.warn("", e);
}
@@ -390,7 +423,7 @@ public class LeaderElector {
void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
ElectionWatcher watcher = this.watcher;
ElectionContext ctx = context.copy();
- if(watcher!= null) watcher.cancel(this.context.leaderSeqPath);
+ if (watcher != null) watcher.cancel();
this.context.cancelElection();
this.context = ctx;
joinElection(ctx, true, joinAtHead);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Wed Dec 24 22:55:36 2014
@@ -21,6 +21,9 @@ import static org.apache.solr.cloud.Assi
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@@ -659,7 +662,7 @@ public class OverseerCollectionProcessor
balanceProperty(message);
break;
case REBALANCELEADERS:
- processAssignLeaders(message);
+ processRebalanceLeaders(message);
break;
default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
@@ -687,42 +690,36 @@ public class OverseerCollectionProcessor
}
@SuppressWarnings("unchecked")
- // re-purpose BALANCELEADERS to reassign a single leader over here
- private void processAssignLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
- String collectionName = message.getStr(COLLECTION_PROP);
- String shardId = message.getStr(SHARD_ID_PROP);
- String baseURL = message.getStr(BASE_URL_PROP);
- String coreName = message.getStr(CORE_NAME_PROP);
-
- if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(shardId) || StringUtils.isBlank(baseURL) ||
- StringUtils.isBlank(coreName)) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The '%s', '%s', '%s' and '%s' parameters are required when assigning a leader",
- COLLECTION_PROP, SHARD_ID_PROP, BASE_URL_PROP, CORE_NAME_PROP));
- }
- SolrZkClient zkClient = zkStateReader.getZkClient();
- DistributedQueue inQueue = Overseer.getInQueue(zkClient);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower());
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, shardId);
- propMap.put(BASE_URL_PROP, baseURL);
- propMap.put(CORE_NAME_PROP, coreName);
- inQueue.offer(zkStateReader.toJSON(propMap));
- }
+ private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
+ checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
+ NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
+ params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
+ params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
+ params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
+ params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
+ params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
+ params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
+
+ String baseUrl = message.getStr(BASE_URL_PROP);
+ ShardRequest sreq = new ShardRequest();
+ sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
+ // yes, they must use same admin handler path everywhere...
+ params.set("qt", adminPath);
+ sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
+ sreq.shards = new String[] {baseUrl};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ shardHandler.submit(sreq, baseUrl, sreq.params);
+ }
@SuppressWarnings("unchecked")
private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
- if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
- StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
- StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
- StringUtils.isBlank(message.getStr(PROPERTY_PROP)) ||
- StringUtils.isBlank(message.getStr(PROPERTY_VALUE_PROP))) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
- COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP));
- }
+ checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
@@ -733,14 +730,7 @@ public class OverseerCollectionProcessor
}
private void processReplicaDeletePropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
- if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
- StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
- StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
- StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
- throw new SolrException(ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
- COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP));
- }
+ checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
DistributedQueue inQueue = Overseer.getInQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Dec 24 22:55:36 2014
@@ -80,6 +80,15 @@ import org.apache.solr.core.SolrResource
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
+
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -1022,7 +1031,7 @@ public final class ZkController {
ZkNodeProps ourProps = new ZkNodeProps(props);
-
+
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
collection, coreNodeName, ourProps, this, cc);
@@ -1860,6 +1869,31 @@ public final class ZkController {
}
+ public void rejoinShardLeaderElection(SolrParams params) {
+ try {
+ String collectionName = params.get(COLLECTION_PROP);
+ String shardId = params.get(SHARD_ID_PROP);
+ String nodeName = params.get(NODE_NAME_PROP);
+ String coreName = params.get(CORE_NAME_PROP);
+ String electionNode = params.get(ELECTION_NODE_PROP);
+ String baseUrl = params.get(BASE_URL_PROP);
+
+ ZkNodeProps zkProps = new ZkNodeProps(CORE_NAME_PROP, coreName, NODE_NAME_PROP, nodeName, COLLECTION_PROP, collectionName,
+ SHARD_ID_PROP, shardId, ELECTION_NODE_PROP, electionNode, BASE_URL_PROP, baseUrl);
+
+ ShardLeaderElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, collectionName,
+ nodeName, zkProps, this, getCoreContainer());
+ LeaderElector elect = new LeaderElector(this.zkClient);
+ context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
+ elect.setup(context);
+
+ elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
+ }
+
+ }
+
public void checkOverseerDesignate() {
try {
byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true);
@@ -2280,7 +2314,7 @@ public final class ZkController {
private void setConfWatcher(String zkDir, Watcher watcher) {
try {
- zkClient.exists(zkDir,watcher,true);
+ zkClient.exists(zkDir, watcher, true);
} catch (KeeperException e) {
log.error("failed to set watcher for conf dir {} ", zkDir);
} catch (InterruptedException e) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Wed Dec 24 22:55:36 2014
@@ -32,10 +32,11 @@ import static org.apache.solr.cloud.Over
import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@@ -44,6 +45,7 @@ import static org.apache.solr.common.clo
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
@@ -69,6 +71,7 @@ import java.nio.charset.StandardCharsets
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -82,6 +85,7 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerCollectionProcessor;
import org.apache.solr.cloud.OverseerSolrResponse;
@@ -295,78 +299,25 @@ public class CollectionsHandler extends
if (dc == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
}
- Map<String, String> current = new HashMap<>();
+ Map<String, String> currentRequests = new HashMap<>();
int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
if (max <= 0) max = Integer.MAX_VALUE;
int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
NamedList<Object> results = new NamedList<>();
- SolrQueryResponse rspIgnore = new SolrQueryResponse();
- final String inactivePreferreds = "inactivePreferreds";
- final String alreadyLeaders = "alreadyLeaders";
+
boolean keepGoing = true;
for (Slice slice : dc.getSlices()) {
- for (Replica replica : slice.getReplicas()) {
- // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
- if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
- continue;
- }
- if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
- NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
- if (inactives == null) {
- inactives = new NamedList<>();
- results.add(inactivePreferreds, inactives);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "skipped");
- res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
- res.add("nodeName", replica.getNodeName());
- inactives.add(replica.getName(), res);
- break; // Don't try to assign if we're not active!
- } // OK, we're the one, get in the queue to become the leader.
- if (replica.getBool(LEADER_PROP, false)) {
- NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
- if (noops == null) {
- noops = new NamedList<>();
- results.add(alreadyLeaders, noops);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "success");
- res.add("msg", "Already leader");
- res.add("nodeName", replica.getNodeName());
- noops.add(replica.getName(), res);
- break; // already the leader, do nothing.
- }
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, slice.getName());
- propMap.put(BASE_URL_PROP, replica.get(BASE_URL_PROP));
-
- String coreName = (String) replica.get(CORE_NAME_PROP);
- // Put it in the waiting list.
- String asyncId = REBALANCELEADERS.toLower() + "_" + coreName;
- current.put(asyncId, String.format(Locale.ROOT, "Collection: '%s', Shard: '%s', Core: '%s', BaseUrl: '%s'",
- collectionName, slice.getName(), coreName, replica.get(BASE_URL_PROP)));
-
- propMap.put(CORE_NAME_PROP, coreName);
- propMap.put(ASYNC, asyncId);
-
- ZkNodeProps m = new ZkNodeProps(propMap);
- log.info("Queueing collection '" + collectionName + "' slice '" + slice.getName() + "' replica '" +
- coreName + "' to become leader.");
- handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
- break; // Done with this slice, skip the rest of the replicas.
- }
- if (current.size() == max) {
- log.info("Queued " + max + " leader reassgnments, waiting for some to complete.");
- keepGoing = waitForLeaderChange(current, maxWaitSecs, false, results);
+ insurePreferredIsLeader(req, results, slice, currentRequests);
+ if (currentRequests.size() == max) {
+ log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
+ keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to wait!
}
}
}
if (keepGoing == true) {
- keepGoing = waitForLeaderChange(current, maxWaitSecs, true, results);
+ keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
}
if (keepGoing == true) {
log.info("All leader reassignments completed.");
@@ -377,6 +328,166 @@ public class CollectionsHandler extends
rsp.getValues().addAll(results);
}
+ private void insurePreferredIsLeader(SolrQueryRequest req, NamedList<Object> results,
+ Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
+ final String inactivePreferreds = "inactivePreferreds";
+ final String alreadyLeaders = "alreadyLeaders";
+ String collectionName = req.getParams().get(COLLECTION_PROP);
+
+ for (Replica replica : slice.getReplicas()) {
+ // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
+ if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
+ continue;
+ }
+ // OK, we are the preferred leader, are we the actual leader?
+ if (replica.getBool(LEADER_PROP, false)) {
+ //We're a preferred leader, but we're _also_ the leader, don't need to do anything.
+ NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
+ if (noops == null) {
+ noops = new NamedList<>();
+ results.add(alreadyLeaders, noops);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "success");
+ res.add("msg", "Already leader");
+ res.add("shard", slice.getName());
+ res.add("nodeName", replica.getNodeName());
+ noops.add(replica.getName(), res);
+ return; // already the leader, do nothing.
+ }
+
+ // We're the preferred leader, but someone else is leader. Only become leader if we're active.
+ if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
+ NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
+ if (inactives == null) {
+ inactives = new NamedList<>();
+ results.add(inactivePreferreds, inactives);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "skipped");
+ res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
+ res.add("shard", slice.getName());
+ res.add("nodeName", replica.getNodeName());
+ inactives.add(replica.getName(), res);
+ return; // Don't try to become the leader if we're not active!
+ }
+
+ // Replica is the preferred leader but not the actual leader, do something about that.
+ // "Something" is
+ // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
+ // 2> tell the actual leader to re-queue itself.
+
+ ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+
+ List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+ if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
+ log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
+ "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
+ return;
+ }
+
+ // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
+ // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
+ // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
+ // watching the leader node..
+
+ String firstWatcher = electionNodes.get(1);
+
+ if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
+ makeReplicaFirstWatcher(collectionName, slice, replica);
+ }
+
+ String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
+ rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
+ waitForNodeChange(collectionName, slice, electionNodes.get(0));
+
+
+ return; // Done with this slice, skip the rest of the replicas.
+ }
+ }
+ // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
+ void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
+ throws KeeperException, InterruptedException {
+
+ ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+ List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+ // First, queue up the preferred leader at the head of the queue.
+ int newSeq = -1;
+ for (String electionNode : electionNodes) {
+ if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
+ String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
+ rejoinElection(collectionName, slice, electionNode, coreName, true);
+ newSeq = waitForNodeChange(collectionName, slice, electionNode);
+ break;
+ }
+ }
+ if (newSeq == -1) {
+ return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
+ }
+
+ List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+
+ // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
+ electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+ for (String thisNode : electionNodes) {
+ if (LeaderElector.getSeq(thisNode) > newSeq) {
+ break;
+ }
+ if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
+ continue;
+ }
+ if (LeaderElector.getSeq(thisNode) == newSeq) {
+ String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
+ rejoinElection(collectionName, slice, thisNode, coreName, false);
+ waitForNodeChange(collectionName, slice, thisNode);
+ }
+ }
+ }
+
+ int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
+ String nodeName = LeaderElector.getNodeName(electionNode);
+ int oldSeq = LeaderElector.getSeq(electionNode);
+ for (int idx = 0; idx < 600; ++idx) {
+ ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+ List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+ for (String testNode : electionNodes) {
+ if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
+ return LeaderElector.getSeq(testNode);
+ }
+ }
+
+ Thread.sleep(100);
+ }
+ return -1;
+ }
+ private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
+ boolean rejoinAtHead) throws KeeperException, InterruptedException {
+ Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(COLLECTION_PROP, collectionName);
+ propMap.put(SHARD_ID_PROP, slice.getName());
+ propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
+ propMap.put(CORE_NAME_PROP, core);
+ propMap.put(NODE_NAME_PROP, replica.getName());
+ propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
+ propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
+ propMap.put(ELECTION_NODE_PROP, electionNode);
+ String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
+ propMap.put(ASYNC, asyncId);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
+ handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
+ }
+
// currentAsyncIds - map of request IDs and reporting data (value)
// maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
// waitForAll - if true, do not return until all assignments have been made.
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Wed Dec 24 22:55:36 2014
@@ -288,6 +288,16 @@ public class CoreAdminHandler extends Re
}
case LOAD:
break;
+
+ case REJOINLEADERELECTION:
+ ZkController zkController = coreContainer.getZkController();
+
+ if (zkController != null) {
+ zkController.rejoinShardLeaderElection(req.getParams());
+ } else {
+ log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken.");
+ }
+ break;
}
}
rsp.setHttpCaching(false);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Wed Dec 24 22:55:36 2014
@@ -230,7 +230,7 @@ public class LeaderElectionTest extends
props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
"http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "2");
ElectionContext context = new ShardLeaderElectionContextBase(second,
- "slice1", "collection2", "dummynode1", props, zkStateReader);
+ "slice1", "collection2", "dummynode2", props, zkStateReader);
second.setup(context);
second.joinElection(context, false);
Thread.sleep(1000);
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java?rev=1647857&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java Wed Dec 24 22:55:36 2014
@@ -0,0 +1,340 @@
+package org.apache.solr.cloud;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
+
+
+public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
+
+ public static final String COLLECTION_NAME = "testcollection";
+
+ public TestRebalanceLeaders() {
+ schemaString = "schema15.xml"; // we need a string id
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ fixShardCount = true;
+ sliceCount = 4;
+ shardCount = 4;
+ super.setUp();
+ }
+
+ int reps = 10;
+ int timeoutMs = 60000;
+ Map<String, List<Replica>> initial = new HashMap<>();
+
+ Map<String, Replica> expected = new HashMap<>();
+
+
+ @Override
+ public void doTest() throws Exception {
+ CloudSolrServer client = createCloudClient(null);
+ reps = random().nextInt(9) + 1; // make sure and do at least one.
+ try {
+ // Mix up a bunch of different combinations of shards and replicas in order to exercise boundary cases.
+ // shards, replicationfactor, maxreplicaspernode
+ int shards = random().nextInt(7);
+ if (shards < 2) shards = 2;
+ int rFactor = random().nextInt(4);
+ if (rFactor < 2) rFactor = 2;
+ createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
+ } finally {
+ //remove collections
+ client.shutdown();
+ }
+
+ waitForCollection(cloudClient.getZkStateReader(), COLLECTION_NAME, 2);
+ waitForRecoveriesToFinish(COLLECTION_NAME, false);
+
+ listCollection();
+
+ rebalanceLeaderTest();
+ }
+
+ private void listCollection() throws IOException, SolrServerException {
+ //CloudSolrServer client = createCloudClient(null);
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.LIST.toString());
+ SolrRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+
+ NamedList<Object> rsp = cloudClient.request(request);
+ List<String> collections = (List<String>) rsp.get("collections");
+ assertTrue("control_collection was not found in list", collections.contains("control_collection"));
+ assertTrue(DEFAULT_COLLECTION + " was not found in list", collections.contains(DEFAULT_COLLECTION));
+ assertTrue(COLLECTION_NAME + " was not found in list", collections.contains(COLLECTION_NAME));
+ } finally {
+ //remove collections
+ //client.shutdown();
+ }
+ }
+
+ void recordInitialState() throws InterruptedException {
+ Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
+
+ // Assemble a list of all the replicas for all the shards in a convenient way to look at them.
+ for (Map.Entry<String, Slice> ent : slices.entrySet()) {
+ initial.put(ent.getKey(), new ArrayList<>(ent.getValue().getReplicas()));
+ }
+ }
+
+ void rebalanceLeaderTest() throws InterruptedException, IOException, SolrServerException, KeeperException {
+ recordInitialState();
+ for (int idx = 0; idx < reps; ++idx) {
+ issueCommands();
+ checkConsistency();
+ }
+ }
+
+ // After we've called the rebalance command, we want to insure that:
+ // 1> all replicas appear once and only once in the respective leader election queue
+ // 2> All the replicas we _think_ are leaders are in the 0th position in the leader election queue.
+ // 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
+ void checkConsistency() throws InterruptedException, KeeperException {
+ long start = System.currentTimeMillis();
+
+ while ((System.currentTimeMillis() - start) < timeoutMs) {
+ if (checkAppearOnce() &&
+ checkElectionZero() &&
+ checkZkLeadersAgree()) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ fail("Checking the rebalance leader command failed");
+ }
+
+
+ // Do all the nodes appear exactly once in the leader election queue and vice-versa?
+ Boolean checkAppearOnce() throws KeeperException, InterruptedException {
+
+ for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
+ List<String> leaderQueue = cloudClient.getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
+ "/leader_elect/" + ent.getKey() + "/election", null, true);
+
+ if (leaderQueue.size() != ent.getValue().size()) {
+ return false;
+ }
+ // Check that each election node has a corresponding replica.
+ for (String electionNode : leaderQueue) {
+ if (checkReplicaName(LeaderElector.getNodeName(electionNode), ent.getValue())) {
+ continue;
+ }
+ return false;
+ }
+ // Check that each replica has an election node.
+ for (Replica rep : ent.getValue()) {
+ if (checkElectionNode(rep.getName(), leaderQueue)) {
+ continue;
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Check that the given name is in the leader election queue
+ Boolean checkElectionNode(String repName, List<String> leaderQueue) {
+ for (String electionNode : leaderQueue) {
+ if (repName.equals(LeaderElector.getNodeName(electionNode))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Check that the name passed in corresponds to a replica.
+ Boolean checkReplicaName(String toCheck, List<Replica> replicas) {
+ for (Replica rep : replicas) {
+ if (toCheck.equals(rep.getName())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Get the shard leader election from ZK and sort it. The node may not actually be there, so retry
+ List<String> getOverseerSort(String key) {
+ List<String> ret = null;
+ try {
+ ret = OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
+ "/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election");
+ return ret;
+ } catch (KeeperException e) {
+ cloudClient.connect();
+ } catch (InterruptedException e) {
+ return null;
+ }
+ return null;
+ }
+
+ // Is every node we think is the leader in the zeroth position in the leader election queue?
+ Boolean checkElectionZero() {
+ for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+
+ List<String> leaderQueue = getOverseerSort(ent.getKey());
+ if (leaderQueue == null) return false;
+
+ String electName = LeaderElector.getNodeName(leaderQueue.get(0));
+ String coreName = ent.getValue().getName();
+ if (electName.equals(coreName) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Do who we _think_ should be the leader agree with the leader nodes?
+ Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
+ for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+
+ String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey();
+ byte[] data = getZkData(cloudClient, path);
+ if (data == null) return false;
+
+ String repCore = null;
+ String zkCore = null;
+
+ if (data == null) {
+ return false;
+ } else {
+ Map m = (Map) ZkStateReader.fromJSON(data);
+ zkCore = (String) m.get("core");
+ repCore = ent.getValue().getStr("core");
+ if (zkCore.equals(repCore) == false) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ byte[] getZkData(CloudSolrServer server, String path) {
+ org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
+ long start = System.currentTimeMillis();
+ try {
+ byte[] data = server.getZkStateReader().getZkClient().getData(path, null, stat, true);
+ if (data != null) {
+ return data;
+ }
+ } catch (KeeperException.NoNodeException e) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ return null;
+ }
+ } catch (InterruptedException | KeeperException e) {
+ return null;
+ }
+ return null;
+ }
+
+ // It's OK not to check the return here since the subsequent tests will fail.
+ void issueCommands() throws IOException, SolrServerException, KeeperException, InterruptedException {
+
+ // Find a replica to make the preferredLeader. NOTE: may be one that's _already_ leader!
+ expected.clear();
+ for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
+ List<Replica> replicas = ent.getValue();
+ Replica rep = replicas.get(Math.abs(random().nextInt()) % replicas.size());
+ expected.put(ent.getKey(), rep);
+ issuePreferred(ent.getKey(), rep);
+ }
+
+ if (waitForAllPreferreds() == false) {
+ fail("Waited for timeout for preferredLeader assignments to be made and they werent.");
+ }
+ //fillExpectedWithCurrent();
+ // Now rebalance the leaders
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
+
+ // Insure we get error returns when omitting required parameters
+ params.set("collection", COLLECTION_NAME);
+ params.set("maxAtOnce", "10");
+ SolrRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+ cloudClient.request(request);
+ }
+
+ void issuePreferred(String slice, Replica rep) throws IOException, SolrServerException, InterruptedException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
+
+ // Insure we get error returns when omitting required parameters
+
+ params.set("collection", COLLECTION_NAME);
+ params.set("shard", slice);
+ params.set("replica", rep.getName());
+ params.set("property", "preferredLeader");
+ params.set("property.value", "true");
+
+ SolrRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+ cloudClient.request(request);
+ }
+
+ boolean waitForAllPreferreds() throws KeeperException, InterruptedException {
+ boolean goAgain = true;
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < timeoutMs) {
+ goAgain = false;
+ cloudClient.getZkStateReader().updateClusterState(true);
+ Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
+
+ for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+ Replica me = slices.get(ent.getKey()).getReplica(ent.getValue().getName());
+ if (me.getBool("property.preferredleader", false) == false) {
+ goAgain = true;
+ break;
+ }
+ }
+ if (goAgain) {
+ Thread.sleep(250);
+ } else {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
+
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Wed Dec 24 22:55:36 2014
@@ -62,6 +62,7 @@ public class ZkStateReader implements Cl
public static final String STATE_PROP = "state";
public static final String CORE_NAME_PROP = "core";
public static final String COLLECTION_PROP = "collection";
+ public static final String ELECTION_NODE_PROP = "election_node";
public static final String SHARD_ID_PROP = "shard";
public static final String REPLICA_PROP = "replica";
public static final String SHARD_RANGE_PROP = "shard_range";
@@ -78,6 +79,7 @@ public class ZkStateReader implements Cl
public static final String ALIASES = "/aliases.json";
public static final String CLUSTER_STATE = "/clusterstate.json";
public static final String CLUSTER_PROPS = "/clusterprops.json";
+ public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
@@ -102,9 +104,10 @@ public class ZkStateReader implements Cl
private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
- public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
+ public static final String LEADER_ELECT_ZKNODE = "leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders";
+ public static final String ELECTION_NODE = "election";
private final Set<String> watchedCollections = new HashSet<String>();
@@ -658,6 +661,16 @@ public class ZkStateReader implements Cl
: "");
}
+ /**
+ * Get path where shard leader elections ephemeral nodes are.
+ */
+ public static String getShardLeadersElectPath(String collection, String shardId) {
+ return COLLECTIONS_ZKNODE + "/" + collection + "/"
+ + LEADER_ELECT_ZKNODE + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
+ : "");
+ }
+
+
public List<ZkCoreNodeProps> getReplicaProps(String collection,
String shardId, String thisCoreNodeName) {
return getReplicaProps(collection, shardId, thisCoreNodeName, null);
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Wed Dec 24 22:55:36 2014
@@ -135,7 +135,8 @@ public abstract class CoreAdminParams
LOAD_ON_STARTUP,
TRANSIENT,
OVERSEEROP,
- REQUESTSTATUS;
+ REQUESTSTATUS,
+ REJOINLEADERELECTION;
public static CoreAdminAction get( String p )
{