You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/09/01 18:13:39 UTC
svn commit: r1700603 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/clo...
Author: markrmiller
Date: Tue Sep 1 16:13:38 2015
New Revision: 1700603
URL: http://svn.apache.org/r1700603
Log:
SOLR-7844: Zookeeper session expiry during shard leader election can cause multiple leaders.
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Sep 1 16:13:38 2015
@@ -181,6 +181,9 @@ Bug Fixes
* SOLR-7988: SolrJ could not make requests to handlers with '/admin/' prefix (noble , ludovic Boutros)
+* SOLR-7844: Zookeeper session expiry during shard leader election can cause multiple leaders.
+ (Mike Roberts, Mark Miller, Jessica Cheng)
+
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Tue Sep 1 16:13:38 2015
@@ -2,10 +2,12 @@ package org.apache.solr.cloud;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.Path;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
@@ -30,6 +32,11 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.OpResult.SetDataResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,16 +80,16 @@ public abstract class ElectionContext im
}
public void cancelElection() throws InterruptedException, KeeperException {
- if( leaderSeqPath != null ){
+ if (leaderSeqPath != null) {
try {
- log.info("canceling election {}",leaderSeqPath );
+ log.info("Canceling election {}", leaderSeqPath);
zkClient.delete(leaderSeqPath, -1, true);
} catch (NoNodeException e) {
// fine
- log.warn("cancelElection did not find election node to remove {}" ,leaderSeqPath);
+ log.info("cancelElection did not find election node to remove {}", leaderSeqPath);
}
} else {
- log.warn("cancelElection skipped as this context has not been initialized");
+ log.info("cancelElection skipped as this context has not been initialized");
}
}
@@ -104,6 +111,7 @@ class ShardLeaderElectionContextBase ext
protected String shardId;
protected String collection;
protected LeaderElector leaderElector;
+ protected volatile Integer leaderZkNodeParentVersion;
public ShardLeaderElectionContextBase(LeaderElector leaderElector,
final String shardId, final String collection, final String coreNodeName,
@@ -129,25 +137,81 @@ class ShardLeaderElectionContextBase ext
}
@Override
+ public void cancelElection() throws InterruptedException, KeeperException {
+ if (leaderZkNodeParentVersion != null) {
+ try {
+ // We need to be careful and make sure we *only* delete our own leader registration node.
+ // We do this by using a multi and ensuring the parent znode of the leader registration node
+ // matches the version we expect - there is a setData call that increments the parent's znode
+ // version whenever a leader registers.
+ log.info("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion);
+ List<Op> ops = new ArrayList<>(2);
+ ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+ ops.add(Op.delete(leaderPath, -1));
+ zkClient.multi(ops, true);
+ } catch (KeeperException.NoNodeException nne) {
+ // no problem
+ log.info("No leader registration node found to remove: {}", leaderPath);
+ } catch (KeeperException.BadVersionException bve) {
+ log.info("Cannot remove leader registration node because the current registered node is not ours: {}", leaderPath);
+ // no problem
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ SolrException.log(log, e);
+ }
+ leaderZkNodeParentVersion = null;
+ } else {
+ log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
+ }
+ super.cancelElection();
+ }
+
+ @Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
- // register as leader - if an ephemeral is already there, wait just a bit
- // to see if it goes away
+ // register as leader - if an ephemeral is already there, wait to see if it goes away
+ String parent = new Path(leaderPath).getParent().toString();
+ ZkCmdExecutor zcmd = new ZkCmdExecutor(30000);
+ zcmd.ensureExists(parent, zkClient);
+
try {
- RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
- new RetryCmd() {
- @Override
- public void execute() throws Throwable {
- zkClient.makePath(leaderPath, Utils.toJSON(leaderProps), CreateMode.EPHEMERAL, true);
+ RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, new RetryCmd() {
+
+ @Override
+ public void execute() throws InterruptedException, KeeperException {
+ log.info("Creating leader registration node", leaderPath);
+ List<Op> ops = new ArrayList<>(2);
+
+ // We use a multi operation to get the parent nodes version, which will
+ // be used to make sure we only remove our own leader registration node.
+ // The setData call used to get the parent version is also the trigger to
+ // increment the version. We also do a sanity check that our leaderSeqPath exists.
+
+ ops.add(Op.check(leaderSeqPath, -1));
+ ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL));
+ ops.add(Op.setData(parent, null, -1));
+ List<OpResult> results;
+
+ results = zkClient.multi(ops, true);
+
+ for (OpResult result : results) {
+ if (result.getType() == ZooDefs.OpCode.setData) {
+ SetDataResult dresult = (SetDataResult) result;
+ Stat stat = dresult.getStat();
+ leaderZkNodeParentVersion = stat.getVersion();
+ return;
}
}
- );
+ assert leaderZkNodeParentVersion != null;
+ }
+ });
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError) t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
- }
+ }
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
@@ -158,6 +222,10 @@ class ShardLeaderElectionContextBase ext
leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
Overseer.getInQueue(zkClient).offer(Utils.toJSON(m));
+ }
+
+ public LeaderElector getLeaderElector() {
+ return leaderElector;
}
}
@@ -203,7 +271,6 @@ final class ShardLeaderElectionContext e
ActionThrottle lt;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
- cancelElection();
throw new SolrException(ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
}
MDCLoggingContext.setCore(core);
@@ -225,6 +292,13 @@ final class ShardLeaderElectionContext e
waitForReplicasToComeUp(leaderVoteWait);
}
+ if (isClosed) {
+ // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later,
+ // we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will
+ // re-register the cores and handle a new leadership election.
+ return;
+ }
+
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
@@ -312,34 +386,38 @@ final class ShardLeaderElectionContext e
}
boolean isLeader = true;
- try {
- super.runLeaderProcess(weAreReplacement, 0);
- } catch (Exception e) {
- isLeader = false;
- SolrException.log(log, "There was a problem trying to register as the leader", e);
-
- try (SolrCore core = cc.getCore(coreName)) {
+ if (!isClosed) {
+ try {
+ super.runLeaderProcess(weAreReplacement, 0);
+ } catch (Exception e) {
+ isLeader = false;
+ SolrException.log(log, "There was a problem trying to register as the leader", e);
- if (core == null) {
- log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
- return;
+ try (SolrCore core = cc.getCore(coreName)) {
+
+ if (core == null) {
+ log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+ return;
+ }
+
+ core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
+
+ // we could not publish ourselves as leader - try and rejoin election
+ rejoinLeaderElection(core);
}
-
- core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
-
- // we could not publish ourselves as leader - try and rejoin election
- rejoinLeaderElection(core);
}
- }
-
- if (isLeader) {
- // check for any replicas in my shard that were set to down by the previous leader
- try {
- startLeaderInitiatedRecoveryOnReplicas(coreName);
- } catch (Exception exc) {
- // don't want leader election to fail because of
- // an error trying to tell others to recover
+
+ if (isLeader) {
+ // check for any replicas in my shard that were set to down by the previous leader
+ try {
+ startLeaderInitiatedRecoveryOnReplicas(coreName);
+ } catch (Exception exc) {
+ // don't want leader election to fail because of
+ // an error trying to tell others to recover
+ }
}
+ } else {
+ cancelElection();
}
} finally {
MDCLoggingContext.clear();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Tue Sep 1 16:13:38 2015
@@ -22,10 +22,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
+import org.apache.solr.cloud.ZkController.ContextKey;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -70,11 +72,21 @@ public class LeaderElector {
private ElectionWatcher watcher;
+ private Map<ContextKey,ElectionContext> electionContexts;
+ private ContextKey contextKey;
+
public LeaderElector(SolrZkClient zkClient) {
this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
}
+ public LeaderElector(SolrZkClient zkClient, ContextKey key, Map<ContextKey,ElectionContext> electionContexts) {
+ this.zkClient = zkClient;
+ zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
+ this.electionContexts = electionContexts;
+ this.contextKey = key;
+ }
+
public ElectionContext getContext() {
return context;
}
@@ -140,20 +152,6 @@ public class LeaderElector {
retryElection(context, false);//join at the tail again
return;
}
- // first we delete the node advertising the old leader in case the ephem is still there
- try {
- zkClient.delete(context.leaderPath, -1, true);
- }catch (KeeperException.NoNodeException nne){
- //no problem
- }catch (InterruptedException e){
- throw e;
- } catch (Exception e) {
- //failed to delete the leader node
- log.error("leader elect delete error",e);
- retryElection(context, false);
- return;
- // fine
- }
try {
runIamLeaderProcess(context, replacement);
@@ -423,6 +421,9 @@ public class LeaderElector {
void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
ElectionWatcher watcher = this.watcher;
ElectionContext ctx = context.copy();
+ if (electionContexts != null) {
+ electionContexts.put(contextKey, ctx);
+ }
if (watcher != null) watcher.cancel();
this.context.cancelElection();
this.context = ctx;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Tue Sep 1 16:13:38 2015
@@ -97,7 +97,7 @@ import static org.apache.solr.common.clo
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
@@ -309,7 +309,7 @@ public class OverseerCollectionMessageHa
@SuppressWarnings("unchecked")
private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
- NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
+ CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
@@ -317,7 +317,7 @@ public class OverseerCollectionMessageHa
params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
- params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
+ params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Sep 1 16:13:38 2015
@@ -170,8 +170,6 @@ public final class ZkController {
private final ZkCmdExecutor cmdExecutor;
private final ZkStateReader zkStateReader;
- private final LeaderElector leaderElector;
-
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
private final int localHostPort; // example: 54065
@@ -372,6 +370,7 @@ public final class ZkController {
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
+ closeOutstandingElections(registerOnReconnect);
markAllAsNotLeader(registerOnReconnect);
}
}, zkACLProvider);
@@ -383,7 +382,6 @@ public final class ZkController {
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
cmdExecutor = new ZkCmdExecutor(clientTimeout);
- leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient, new Runnable() {
@Override
public void run() {
@@ -480,6 +478,32 @@ public final class ZkController {
}
}
}
+
+ private void closeOutstandingElections(final CurrentCoreDescriptorProvider registerOnReconnect) {
+
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ closeExistingElectionContext(descriptor);
+ }
+ }
+ }
+
+ private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
+ // look for old context - if we find it, cancel it
+ String collection = cd.getCloudDescriptor().getCollectionName();
+ final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
+ ContextKey contextKey = new ContextKey(collection, coreNodeName);
+ ElectionContext prevContext = electionContexts.get(contextKey);
+
+ if (prevContext != null) {
+ prevContext.close();
+ electionContexts.remove(contextKey);
+ }
+
+ return contextKey;
+ }
private void markAllAsNotLeader(
final CurrentCoreDescriptorProvider registerOnReconnect) {
@@ -1068,11 +1092,12 @@ public final class ZkController {
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
ZkNodeProps ourProps = new ZkNodeProps(props);
-
+ LeaderElector leaderElector = new LeaderElector(zkClient, contextKey, electionContexts);
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
collection, coreNodeName, ourProps, this, cc);
@@ -1876,23 +1901,36 @@ public final class ZkController {
public void rejoinShardLeaderElection(SolrParams params) {
try {
+
String collectionName = params.get(COLLECTION_PROP);
String shardId = params.get(SHARD_ID_PROP);
- String nodeName = params.get(NODE_NAME_PROP);
+ String coreNodeName = params.get(CORE_NODE_NAME_PROP);
String coreName = params.get(CORE_NAME_PROP);
String electionNode = params.get(ELECTION_NODE_PROP);
String baseUrl = params.get(BASE_URL_PROP);
- ZkNodeProps zkProps = new ZkNodeProps(CORE_NAME_PROP, coreName, NODE_NAME_PROP, nodeName, COLLECTION_PROP, collectionName,
- SHARD_ID_PROP, shardId, ELECTION_NODE_PROP, electionNode, BASE_URL_PROP, baseUrl);
-
- ShardLeaderElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, collectionName,
- nodeName, zkProps, this, getCoreContainer());
- LeaderElector elect = new LeaderElector(this.zkClient);
- context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
- elect.setup(context);
-
- elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+ try (SolrCore core = cc.getCore(coreName)) {
+ MDCLoggingContext.setCore(core);
+
+ log.info("Rejoin the shard leader election.");
+
+ ContextKey contextKey = new ContextKey(collectionName, coreNodeName);
+
+ ElectionContext prevContext = electionContexts.get(contextKey);
+ if (prevContext != null) prevContext.cancelElection();
+
+ ZkNodeProps zkProps = new ZkNodeProps(BASE_URL_PROP, baseUrl, CORE_NAME_PROP, coreName, NODE_NAME_PROP, getNodeName(), CORE_NODE_NAME_PROP, coreNodeName);
+
+ LeaderElector elect = ((ShardLeaderElectionContextBase) prevContext).getLeaderElector();
+ ShardLeaderElectionContext context = new ShardLeaderElectionContext(elect, shardId, collectionName,
+ coreNodeName, zkProps, this, getCoreContainer());
+
+ context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
+ elect.setup(context);
+ electionContexts.put(contextKey, context);
+
+ elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+ }
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java Tue Sep 1 16:13:38 2015
@@ -98,7 +98,7 @@ class CdcrLeaderStateManager extends Cdc
private String getZnodePath() {
String myShardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
- return "/collections/" + myCollection + "/leaders/" + myShardId;
+ return "/collections/" + myCollection + "/leaders/" + myShardId + "/leader";
}
void setAmILeader(boolean amILeader) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Tue Sep 1 16:13:38 2015
@@ -308,7 +308,7 @@ public class CoreAdminHandler extends Re
if (zkController != null) {
zkController.rejoinShardLeaderElection(req.getParams());
} else {
- log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken.");
+ log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELECTION. No action taken.");
}
break;
case INVOKE:
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java Tue Sep 1 16:13:38 2015
@@ -1,5 +1,18 @@
package org.apache.solr.handler.admin;
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -39,21 +52,12 @@ import org.apache.solr.core.CoreContaine
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.zookeeper.KeeperException;
-
-import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class RebalanceLeaders {
+ private static Logger log = LoggerFactory.getLogger(RebalanceLeaders.class);
+
final SolrQueryRequest req;
final SolrQueryResponse rsp;
final CollectionsHandler collectionsHandler;
@@ -72,7 +76,7 @@ class RebalanceLeaders {
String collectionName = req.getParams().get(COLLECTION_PROP);
if (StringUtils.isBlank(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
+ String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
}
coreContainer.getZkController().getZkStateReader().updateClusterState();
ClusterState clusterState = coreContainer.getZkController().getClusterState();
@@ -88,9 +92,9 @@ class RebalanceLeaders {
boolean keepGoing = true;
for (Slice slice : dc.getSlices()) {
- insurePreferredIsLeader(results, slice, currentRequests);
+ ensurePreferredIsLeader(results, slice, currentRequests);
if (currentRequests.size() == max) {
- CollectionsHandler.log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
+ log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to wait!
@@ -101,15 +105,15 @@ class RebalanceLeaders {
keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
}
if (keepGoing == true) {
- CollectionsHandler.log.info("All leader reassignments completed.");
+ log.info("All leader reassignments completed.");
} else {
- CollectionsHandler.log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
+ log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
}
rsp.getValues().addAll(results);
}
- private void insurePreferredIsLeader(NamedList<Object> results,
+ private void ensurePreferredIsLeader(NamedList<Object> results,
Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
final String inactivePreferreds = "inactivePreferreds";
final String alreadyLeaders = "alreadyLeaders";
@@ -164,8 +168,8 @@ class RebalanceLeaders {
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
- CollectionsHandler.log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
- "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
+ log.info("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
+ "election queue, but replica " + replica.getName() + " doesn't think it's the leader.");
return;
}
@@ -210,10 +214,6 @@ class RebalanceLeaders {
return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
}
- List<String> electionNodesTmp = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
- ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
-
// Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
@@ -250,6 +250,7 @@ class RebalanceLeaders {
}
return -1;
}
+
private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
boolean rejoinAtHead) throws KeeperException, InterruptedException {
Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
@@ -258,7 +259,7 @@ class RebalanceLeaders {
propMap.put(SHARD_ID_PROP, slice.getName());
propMap.put(QUEUE_OPERATION, REBALANCELEADERS.toLower());
propMap.put(CORE_NAME_PROP, core);
- propMap.put(NODE_NAME_PROP, replica.getName());
+ propMap.put(CORE_NODE_NAME_PROP, replica.getName());
propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
propMap.put(ELECTION_NODE_PROP, electionNode);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java Tue Sep 1 16:13:38 2015
@@ -16,6 +16,13 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -29,17 +36,12 @@ import org.apache.solr.common.util.Utils
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
-
+ private static Logger log = LoggerFactory.getLogger(TestRebalanceLeaders.class);
public static final String COLLECTION_NAME = "testcollection";
public TestRebalanceLeaders() {
@@ -71,7 +73,6 @@ public class TestRebalanceLeaders extend
waitForRecoveriesToFinish(COLLECTION_NAME, false);
listCollection();
-
rebalanceLeaderTest();
}
@@ -117,16 +118,21 @@ public class TestRebalanceLeaders extend
// 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
void checkConsistency() throws InterruptedException, KeeperException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS);
-
- while (! timeout.hasTimedOut()) {
- if (checkAppearOnce() &&
- checkElectionZero() &&
- checkZkLeadersAgree()) {
+ boolean checkAppearOnce = false;
+ boolean checkElectionZero = false;
+ boolean checkZkLeadersAgree = false;
+ while (!timeout.hasTimedOut()) {
+ checkAppearOnce = checkAppearOnce();
+ checkElectionZero = checkElectionZero();
+ checkZkLeadersAgree = checkZkLeadersAgree();
+ if (checkAppearOnce && checkElectionZero && checkZkLeadersAgree) {
return;
}
Thread.sleep(1000);
}
- fail("Checking the rebalance leader command failed");
+
+ fail("Checking the rebalance leader command failed, checkAppearOnce=" + checkAppearOnce + " checkElectionZero="
+ + checkElectionZero + " checkZkLeadersAgree=" + checkZkLeadersAgree);
}
@@ -211,25 +217,26 @@ public class TestRebalanceLeaders extend
// Do who we _think_ should be the leader agree with the leader nodes?
Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
- for (Map.Entry<String, Replica> ent : expected.entrySet()) {
-
- String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey();
+ for (Map.Entry<String,Replica> ent : expected.entrySet()) {
+
+ String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey() + "/leader";
byte[] data = getZkData(cloudClient, path);
- if (data == null) return false;
-
+ if (data == null) {
+ log.warn("path to check not found {}", path);
+ return false;
+ }
+
String repCore = null;
String zkCore = null;
-
- if (data == null) {
+
+ Map m = (Map) Utils.fromJSON(data);
+ zkCore = (String) m.get("core");
+ repCore = ent.getValue().getStr("core");
+ if (zkCore.equals(repCore) == false) {
+ log.warn("leader in zk does not match what we expect: {} != {}", zkCore, repCore);
return false;
- } else {
- Map m = (Map) Utils.fromJSON(data);
- zkCore = (String) m.get("core");
- repCore = ent.getValue().getStr("core");
- if (zkCore.equals(repCore) == false) {
- return false;
- }
}
+
}
return true;
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Tue Sep 1 16:13:38 2015
@@ -643,7 +643,7 @@ public class ZkStateReader implements Cl
public static String getShardLeadersPath(String collection, String shardId) {
return COLLECTIONS_ZKNODE + "/" + collection + "/"
+ SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId)
- : "");
+ : "") + "/leader";
}
/**
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java Tue Sep 1 16:13:38 2015
@@ -1,5 +1,8 @@
package org.apache.solr.common.util;
+import java.util.Collections;
+import java.util.Set;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -19,18 +22,34 @@ package org.apache.solr.common.util;
import java.util.concurrent.TimeUnit;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class RetryUtil {
+ private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
public static interface RetryCmd {
public void execute() throws Throwable;
}
+ public static interface BooleanRetryCmd {
+ public boolean execute();
+ }
+
public static void retryOnThrowable(Class clazz, long timeoutms, long intervalms, RetryCmd cmd) throws Throwable {
+ retryOnThrowable(Collections.singleton(clazz), timeoutms, intervalms, cmd);
+ }
+
+ public static void retryOnThrowable(Set<Class> classes, long timeoutms, long intervalms, RetryCmd cmd) throws Throwable {
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
while (true) {
try {
cmd.execute();
} catch (Throwable t) {
- if (clazz.isInstance(t) && System.nanoTime() < timeout) {
+ if (isInstanceOf(classes, t) && System.nanoTime() < timeout) {
+ log.info("Retry due to Throwable, " + t.getClass().getName() + " " + t.getMessage());
Thread.sleep(intervalms);
continue;
}
@@ -40,4 +59,29 @@ public class RetryUtil {
break;
}
}
+
+ private static boolean isInstanceOf(Set<Class> classes, Throwable t) {
+ for (Class c : classes) {
+ if (c.isInstance(t)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static void retryOnBoolean(long timeoutms, long intervalms, BooleanRetryCmd cmd) {
+ long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
+ while (true) {
+ boolean resp = cmd.execute();
+ if (!resp && System.nanoTime() < timeout) {
+ continue;
+ } else if (System.nanoTime() >= timeout) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out while retrying operation");
+ }
+
+ // success
+ break;
+ }
+ }
+
}