You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/30 18:23:18 UTC
svn commit: r1379002 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/
core/src/java/org/apache/solr/handler/admin/
core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/proces...
Author: markrmiller
Date: Thu Aug 30 16:23:16 2012
New Revision: 1379002
URL: http://svn.apache.org/viewvc?rev=1379002&view=rev
Log:
SOLR-3750: On session expiration, we should explicitly wait some time before running the leader sync process so that we are sure every node participates.
SOLR-3772: On cluster startup, we should wait until we see all registered replicas before running the leader process or if they all do not come up, N amount of time.
SOLR-3752: When a leader goes down, have the Overseer clear the leader state in cluster.json
SOLR-3751: Add defensive checks for SolrCloud updates and requests that ensure the local state matches what we can tell the request expected.
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Aug 30 16:23:16 2012
@@ -93,6 +93,14 @@ Bug Fixes
* SOLR-3721: Fix bug that could allow multiple recoveries to run briefly at
the same time if the recovery thread join call was interrupted.
(Per Steffensen, Mark Miller)
+
+* SOLR-3750: On session expiration, we should explicitly wait some time before
+ running the leader sync process so that we are sure every node participates.
+ (Per Steffensen, Mark Miller)
+
+* SOLR-3772: On cluster startup, we should wait until we see all registered
+ replicas before running the leader process - or if they all do not come up,
+ N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
Other Changes
----------------------
@@ -111,6 +119,12 @@ Other Changes
* SOLR-2747: Updated changes2html.pl to handle Solr's CHANGES.txt; added
target 'changes-to-html' to solr/build.xml.
(Steve Rowe, Robert Muir)
+
+* SOLR-3752: When a leader goes down, have the Overseer clear the leader state
+ in cluster.json (Mark Miller)
+
+* SOLR-3751: Add defensive checks for SolrCloud updates and requests that ensure
+ the local state matches what we can tell the request expected. (Mark Miller)
================== 4.0.0-BETA ===================
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Thu Aug 30 16:23:16 2012
@@ -26,6 +26,12 @@ public class CloudDescriptor {
private String roles = null;
private Integer numShards;
+ volatile boolean isLeader = false;
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
public void setShardId(String shardId) {
this.shardId = shardId;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Thu Aug 30 16:23:16 2012
@@ -2,6 +2,8 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -15,7 +17,6 @@ import org.apache.solr.core.CoreContaine
import org.apache.solr.core.SolrCore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,31 +83,26 @@ class ShardLeaderElectionContextBase ext
}
@Override
- void runLeaderProcess(boolean weAreReplacement)
- throws KeeperException, InterruptedException, IOException {
-
- try {
- zkClient.makePath(leaderPath,
- leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
- CreateMode.EPHEMERAL, true);
- } catch (NodeExistsException e) {
- // if a previous leader ephemeral still exists for some reason, try and
- // remove it
- zkClient.delete(leaderPath, -1, true);
- zkClient.makePath(leaderPath,
- leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
- CreateMode.EPHEMERAL, true);
- }
+ void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+ InterruptedException, IOException {
+ // this pause is important (and seems to work also at 100ms to 1 second in
+ // many cases),
+ // but I don't know why yet :*( - it must come before this publish call
+ // and can happen at the start of leader election process even
+ Thread.sleep(500);
+
+ zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
+ CreateMode.EPHEMERAL, true);
- // TODO: above we make it looks like leaderProps could be true, but here
- // you would get an NPE if it was.
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- "leader", ZkStateReader.SHARD_ID_PROP, shardId,
- ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
- leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
- ZkStateReader.CORE_NAME_PROP, leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP));
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "leader",
+ ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
+ collection, ZkStateReader.BASE_URL_PROP, leaderProps.getProperties()
+ .get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
+ leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
+ ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
- }
+
+ }
}
@@ -117,66 +113,182 @@ final class ShardLeaderElectionContext e
private ZkController zkController;
private CoreContainer cc;
private SyncStrategy syncStrategy = new SyncStrategy();
+
+ private boolean afterExpiration;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
- final String shardZkNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
+ final String shardZkNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc, boolean afterExpiration) {
super(leaderElector, shardId, collection, shardZkNodeName, props,
zkController.getZkStateReader());
this.zkController = zkController;
this.cc = cc;
+ this.afterExpiration = afterExpiration;
}
@Override
- void runLeaderProcess(boolean weAreReplacement)
- throws KeeperException, InterruptedException, IOException {
- if (cc != null) {
- String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
- SolrCore core = null;
+ void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+ InterruptedException, IOException {
+ String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
+
+ // clear the leader in clusterstate
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "leader",
+ ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
+ collection);
+ Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
+
+ waitForReplicasToComeUp(weAreReplacement);
+
+ // wait for local leader state to clear...
+ // int tries = 0;
+ // while (zkController.getClusterState().getLeader(collection, shardId) !=
+ // null) {
+ // System.out.println("leader still shown " + tries + " " +
+ // zkController.getClusterState().getLeader(collection, shardId));
+ // Thread.sleep(1000);
+ // tries++;
+ // if (tries == 30) {
+ // break;
+ // }
+ // }
+ // Thread.sleep(1000);
+
+ SolrCore core = null;
+ try {
+
+ core = cc.getCore(coreName);
+
+ if (core == null) {
+ cancelElection();
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Fatal Error, SolrCore not found:" + coreName + " in "
+ + cc.getCoreNames());
+ }
+
+ // should I be leader?
+ if (weAreReplacement && !shouldIBeLeader(leaderProps, core)) {
+ // System.out.println("there is a better leader candidate it appears");
+ rejoinLeaderElection(leaderSeqPath, core);
+ return;
+ }
+
+ if (weAreReplacement) {
+ log.info("I may be the new leader - try and sync");
+ // we are going to attempt to be the leader
+ // first cancel any current recovery
+ core.getUpdateHandler().getSolrCoreState().cancelRecovery();
+ boolean success = syncStrategy.sync(zkController, core, leaderProps);
+ // solrcloud_debug
+ // try {
+ // RefCounted<SolrIndexSearcher> searchHolder =
+ // core.getNewestSearcher(false);
+ // SolrIndexSearcher searcher = searchHolder.get();
+ // try {
+ // System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+ // + " synched "
+ // + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+ // } finally {
+ // searchHolder.decref();
+ // }
+ // } catch (Exception e) {
+ //
+ // }
+ if (!success && anyoneElseActive()) {
+ rejoinLeaderElection(leaderSeqPath, core);
+ return;
+ }
+ }
+
+ log.info("I am the new leader: "
+ + ZkCoreNodeProps.getCoreUrl(leaderProps));
+
+ } finally {
+ if (core != null) {
+ core.close();
+ }
+ }
+
+ try {
+ super.runLeaderProcess(weAreReplacement);
+ } catch (Throwable t) {
+ cancelElection();
try {
-
core = cc.getCore(coreName);
-
- if (core == null) {
- cancelElection();
- throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+ core.getCoreDescriptor().getCloudDescriptor().isLeader = false;
+ if (!cc.isShutDown()) {
+ // we could not publish ourselves as leader - rejoin election
+ rejoinLeaderElection(coreName, core);
}
- // should I be leader?
- if (weAreReplacement && !shouldIBeLeader(leaderProps)) {
- // System.out.println("there is a better leader candidate it appears");
- rejoinLeaderElection(leaderSeqPath, core);
- return;
+ } finally {
+ if (core != null) {
+ core.close();
}
+ }
+
+ }
+
+ try {
+ core = cc.getCore(coreName);
+ // we do this after the above super. call so that we don't
+ // briefly think we are the leader and then end up not being
+ // able to publish that we are the leader.
+ core.getCoreDescriptor().getCloudDescriptor().isLeader = true;
+ } finally {
+ if (core != null) {
+ core.close();
+ }
+ }
+
+ }
- if (weAreReplacement) {
- if (zkClient.exists(leaderPath, true)) {
- zkClient.delete(leaderPath, -1, true);
+ private void waitForReplicasToComeUp(boolean weAreReplacement)
+ throws InterruptedException {
+ int retries = 300; // ~ 5 min
+ boolean tryAgain = true;
+ Slice slices = zkController.getClusterState().getSlice(collection, shardId);
+ log.info("Running the leader process. afterExperiation=" + afterExpiration);
+ while (tryAgain || slices == null) {
+
+ // wait for everyone to be up
+ if (slices != null) {
+ Map<String,ZkNodeProps> shards = slices.getShards();
+ Set<Entry<String,ZkNodeProps>> entrySet = shards.entrySet();
+ int found = 0;
+ tryAgain = false;
+ for (Entry<String,ZkNodeProps> entry : entrySet) {
+ ZkCoreNodeProps props = new ZkCoreNodeProps(entry.getValue());
+ if (props.getState().equals(ZkStateReader.ACTIVE)
+ && zkController.getClusterState().liveNodesContain(
+ props.getNodeName())) {
+ found++;
}
- log.info("I may be the new leader - try and sync");
- // we are going to attempt to be the leader
- // first cancel any current recovery
- core.getUpdateHandler().getSolrCoreState().cancelRecovery();
- boolean success = syncStrategy.sync(zkController, core, leaderProps);
- if (!success && anyoneElseActive()) {
- rejoinLeaderElection(leaderSeqPath, core);
- return;
- }
}
- log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps));
- // If I am going to be the leader I have to be active
- core.getUpdateHandler().getSolrCoreState().cancelRecovery();
- zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
+ // on startup and after connection timeout, wait for all known shards
+ if ((afterExpiration || !weAreReplacement)
+ && found >= slices.getShards().size()) {
+ log.info("Enough replicas found to continue.");
+ tryAgain = false;
+ } else if (!afterExpiration && found >= slices.getShards().size() - 1) {
+ // a previous leader went down - wait for one less than the total
+ // known shards
+ log.info("Enough replicas found to continue.");
+ tryAgain = false;
+ } else {
+ log.info("Waiting until we see more replicas up");
+ }
- } finally {
- if (core != null ) {
- core.close();
+ retries--;
+ if (retries == 0) {
+ log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
+ break;
}
}
-
+ if (tryAgain) {
+ Thread.sleep(1000);
+ slices = zkController.getClusterState().getSlice(collection, shardId);
+ }
}
-
- super.runLeaderProcess(weAreReplacement);
}
private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
@@ -195,7 +307,8 @@ final class ShardLeaderElectionContext e
leaderElector.joinElection(this);
}
- private boolean shouldIBeLeader(ZkNodeProps leaderProps) {
+ private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
+ log.info("Checking if I should try and be the leader.");
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
Map<String,Slice> slices = clusterState.getSlices(this.collection);
Slice slice = slices.get(shardId);
@@ -210,6 +323,7 @@ final class ShardLeaderElectionContext e
&& clusterState.liveNodesContain(shard.getValue().get(
ZkStateReader.NODE_NAME_PROP))) {
// we are alive
+ log.info("I am Active and live, it's okay to be the leader.");
return true;
}
}
@@ -222,7 +336,19 @@ final class ShardLeaderElectionContext e
foundSomeoneElseActive = true;
}
}
-
+ if (!foundSomeoneElseActive) {
+ log.info("I am not Active but no one else is either, it's okay to be the leader");
+ try {
+ zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ } else {
+ log.info("I am not Active and someone else appears to be a better leader candidate.");
+ }
return !foundSomeoneElseActive;
}
@@ -261,24 +387,16 @@ final class OverseerElectionContext exte
}
@Override
- void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
+ void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+ InterruptedException {
- final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
+ final String id = leaderSeqPath
+ .substring(leaderSeqPath.lastIndexOf("/") + 1);
ZkNodeProps myProps = new ZkNodeProps("id", id);
-
- try {
- zkClient.makePath(leaderPath,
- ZkStateReader.toJSON(myProps),
- CreateMode.EPHEMERAL, true);
- } catch (NodeExistsException e) {
- // if a previous leader ephemeral still exists for some reason, try and
- // remove it
- zkClient.delete(leaderPath, -1, true);
- zkClient.makePath(leaderPath,
- ZkStateReader.toJSON(myProps),
- CreateMode.EPHEMERAL, true);
- }
-
+
+ zkClient.makePath(leaderPath, ZkStateReader.toJSON(myProps),
+ CreateMode.EPHEMERAL, true);
+
overseer.start(id);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Thu Aug 30 16:23:16 2012
@@ -93,6 +93,13 @@ public class LeaderElector {
sortSeqs(seqs);
List<Integer> intSeqs = getSeqs(seqs);
if (seq <= intSeqs.get(0)) {
+ // first we delete the node advertising the old leader in case the ephem is still there
+ try {
+ zkClient.delete(context.leaderPath, -1, true);
+ } catch(Exception e) {
+ // fine
+ }
+
runIamLeaderProcess(context, replacement);
} else {
// I am not the leader - watch the node below me
@@ -138,6 +145,7 @@ public class LeaderElector {
} catch (KeeperException.SessionExpiredException e) {
throw e;
} catch (KeeperException e) {
+ SolrException.log(log, "Failed setting watch", e);
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
checkIfIamLeader(seq, context, true);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Aug 30 16:23:16 2012
@@ -166,12 +166,19 @@ public class Overseer {
} else if (DELETECORE.equals(operation)) {
clusterState = removeCore(clusterState, message);
} else if (ZkStateReader.LEADER_PROP.equals(operation)) {
+
+ StringBuilder sb = new StringBuilder();
String baseUrl = message.get(ZkStateReader.BASE_URL_PROP);
String coreName = message.get(ZkStateReader.CORE_NAME_PROP);
- final String leaderUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+ sb.append(baseUrl);
+ if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/");
+ sb.append(coreName == null ? "" : coreName);
+ if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
clusterState = setShardLeader(clusterState,
message.get(ZkStateReader.COLLECTION_PROP),
- message.get(ZkStateReader.SHARD_ID_PROP), leaderUrl);
+ message.get(ZkStateReader.SHARD_ID_PROP),
+ sb.length() > 0 ? sb.toString() : null);
+
} else {
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Thu Aug 30 16:23:16 2012
@@ -185,6 +185,7 @@ public class RecoveryStrategy extends Th
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.RECOVERING);
prepCmd.setCheckLive(true);
+ prepCmd.setOnlyIfLeader(true);
prepCmd.setPauseFor(6000);
server.request(prepCmd);
@@ -239,6 +240,7 @@ public class RecoveryStrategy extends Th
return;
}
+ boolean firstTime = true;
List<Long> recentVersions;
UpdateLog.RecentUpdates recentUpdates = null;
@@ -273,9 +275,6 @@ public class RecoveryStrategy extends Th
log.info("###### startupVersions=" + startingVersions);
}
-
- boolean firstTime = true;
-
if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
// when we went down. We may have received updates since then.
@@ -305,7 +304,10 @@ public class RecoveryStrategy extends Th
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
boolean isLeader = leaderUrl.equals(ourUrl);
- if (isLeader) {
+ if (isLeader && !cloudDesc.isLeader) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+ }
+ if (cloudDesc.isLeader) {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
log.info("Finished recovery process. core=" + coreName);
@@ -333,9 +335,6 @@ public class RecoveryStrategy extends Th
new ModifiableSolrParams());
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
log.info("PeerSync Recovery was successful - registering as Active. core=" + coreName);
- // System.out
- // .println("Sync Recovery was successful - registering as Active "
- // + zkController.getNodeName());
// solrcloud_debug
// try {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Thu Aug 30 16:23:16 2012
@@ -108,7 +108,8 @@ public class SyncStrategy {
if (!success
&& !areAnyOtherReplicasActive(zkController, leaderProps, collection,
shardId)) {
- log.info("Sync was not a success but no on else i active! I am the leader");
+ log.info("Sync was not a success but no one else is active! I am the leader");
+ zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
success = true;
}
@@ -224,14 +225,14 @@ public class SyncStrategy {
requestRecovery(((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
- } catch (Exception e) {
- SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e);
+ } catch (Throwable t) {
+ SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
}
} else {
log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress());
}
+
}
-
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Thu Aug 30 16:23:16 2012
@@ -41,6 +41,7 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -112,8 +113,12 @@ public final class ZkController {
private final String nodeName; // example: 127.0.0.1:54065_solr
private final String baseURL; // example: http://127.0.0.1:54065/solr
+
private LeaderElector overseerElector;
+
+ // for now, this can be null in tests, in which case recovery will be inactive, and other features
+ // may accept defaults or use mocks rather than pulling things from a CoreContainer
private CoreContainer cc;
protected volatile Overseer overseer;
@@ -181,7 +186,11 @@ public final class ZkController {
// TODO: we need to think carefully about what happens when it was
// a leader that was expired - as well as what to do about leaders/overseers
// with connection loss
- register(descriptor.getName(), descriptor, true);
+ try {
+ register(descriptor.getName(), descriptor, true, true);
+ } catch (Throwable t) {
+ SolrException.log(log, "Error registering SolrCore", t);
+ }
}
}
@@ -200,6 +209,45 @@ public final class ZkController {
});
+
+ zkClient.getZkClientConnectionStrategy().addDisconnectedListener(new ZkClientConnectionStrategy.DisconnectedListener() {
+
+ @Override
+ public void disconnected() {
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+ // re register all descriptors
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ descriptor.getCloudDescriptor().isLeader = false;
+ }
+ }
+ }
+ });
+
+ zkClient.getZkClientConnectionStrategy().addConnectedListener(new ZkClientConnectionStrategy.ConnectedListener() {
+
+ @Override
+ public void connected() {
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
+ String leaderUrl;
+ try {
+ leaderUrl = getLeaderProps(cloudDesc.getCollectionName(), cloudDesc.getShardId())
+ .getCoreUrl();
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
+ String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), descriptor.getName());
+ boolean isLeader = leaderUrl.equals(ourUrl);
+ log.info("SolrCore connected to ZooKeeper - we are " + ourUrl + " and leader is " + leaderUrl);
+ cloudDesc.isLeader = isLeader;
+ }
+ }
+ }
+ });
+
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
cmdExecutor = new ZkCmdExecutor();
@@ -468,7 +516,7 @@ public final class ZkController {
* @throws Exception
*/
public String register(String coreName, final CoreDescriptor desc) throws Exception {
- return register(coreName, desc, false);
+ return register(coreName, desc, false, false);
}
@@ -478,10 +526,11 @@ public final class ZkController {
* @param coreName
* @param desc
* @param recoverReloadedCores
+ * @param afterExpiration
* @return the shardId for the SolrCore
* @throws Exception
*/
- public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores) throws Exception {
+ public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception {
final String baseUrl = getBaseUrl();
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
@@ -506,7 +555,7 @@ public final class ZkController {
ZkNodeProps leaderProps = new ZkNodeProps(props);
try {
- joinElection(desc);
+ joinElection(desc, afterExpiration);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -517,25 +566,7 @@ public final class ZkController {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- // rather than look in the cluster state file, we go straight to the zknodes
- // here, because on cluster restart there could be stale leader info in the
- // cluster state node that won't be updated for a moment
- String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
-
- // now wait until our currently cloud state contains the latest leader
- String clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000);
- int tries = 0;
- while (!leaderUrl.equals(clusterStateLeader)) {
- if (tries == 60) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "There is conflicting information about the leader of shard: "
- + cloudDesc.getShardId() + " our state says:" + clusterStateLeader + " but zookeeper says:" + leaderUrl);
- }
- Thread.sleep(1000);
- tries++;
- clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000);
- leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
- }
+ String leaderUrl = getLeader(cloudDesc);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.info("We are " + ourUrl + " and leader is " + leaderUrl);
@@ -568,8 +599,7 @@ public final class ZkController {
} else {
log.info("No LogReplay needed for core="+core.getName() + " baseURL=" + baseUrl);
}
- }
-
+ }
boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
collection, coreZkNodeName, shardId, leaderProps, core, cc);
if (!didRecovery) {
@@ -580,12 +610,52 @@ public final class ZkController {
core.close();
}
}
+
// make sure we have an update cluster state right away
zkStateReader.updateClusterState(true);
return shardId;
}
+
+ private String getLeader(final CloudDescriptor cloudDesc) {
+
+ String collection = cloudDesc.getCollectionName();
+ String shardId = cloudDesc.getShardId();
+ // rather than look in the cluster state file, we go straight to the zknodes
+ // here, because on cluster restart there could be stale leader info in the
+ // cluster state node that won't be updated for a moment
+ String leaderUrl;
+ try {
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+ .getCoreUrl();
+
+ // now wait until our currently cloud state contains the latest leader
+ String clusterStateLeader = zkStateReader.getLeaderUrl(collection,
+ shardId, 30000);
+ int tries = 0;
+ while (!leaderUrl.equals(clusterStateLeader)) {
+ if (tries == 60) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "There is conflicting information about the leader of shard: "
+ + cloudDesc.getShardId() + " our state says:"
+ + clusterStateLeader + " but zookeeper says:" + leaderUrl);
+ }
+ Thread.sleep(1000);
+ tries++;
+ clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId,
+ 30000);
+ leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+ .getCoreUrl();
+ }
+
+ } catch (Exception e) {
+ log.error("Error getting leader from zk", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error getting leader from zk", e);
+ }
+ return leaderUrl;
+ }
/**
* Get leader props directly from zk nodes.
@@ -597,8 +667,9 @@ public final class ZkController {
* @throws InterruptedException
*/
private ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice) throws KeeperException, InterruptedException {
+ final String slice) throws InterruptedException {
int iterCount = 60;
+ Exception exp = null;
while (iterCount-- > 0) {
try {
byte[] data = zkClient.getData(
@@ -607,15 +678,21 @@ public final class ZkController {
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
ZkNodeProps.load(data));
return leaderProps;
- } catch (NoNodeException e) {
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ exp = e;
Thread.sleep(500);
}
+ if (cc.isShutDown()) {
+ throw new RuntimeException("CoreContainer is shutdown");
+ }
}
- throw new RuntimeException("Could not get leader props");
+ throw new RuntimeException("Could not get leader props", exp);
}
- private void joinElection(CoreDescriptor cd) throws InterruptedException, KeeperException, IOException {
+ private void joinElection(CoreDescriptor cd, boolean afterExpiration) throws InterruptedException, KeeperException, IOException {
String shardId = cd.getCloudDescriptor().getShardId();
@@ -631,7 +708,7 @@ public final class ZkController {
.getCollectionName();
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
- collection, coreZkNodeName, ourProps, this, cc);
+ collection, coreZkNodeName, ourProps, this, cc, afterExpiration);
leaderElector.setup(context);
electionContexts.put(coreZkNodeName, context);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Thu Aug 30 16:23:16 2012
@@ -598,15 +598,16 @@ public class CoreContainer
}
cores.clear();
} finally {
+ if (shardHandlerFactory != null) {
+ shardHandlerFactory.close();
+ }
+ // we want to close zk stuff last
if(zkController != null) {
zkController.close();
}
if (zkServer != null) {
zkServer.stop();
}
- if (shardHandlerFactory != null) {
- shardHandlerFactory.close();
- }
}
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Thu Aug 30 16:23:16 2012
@@ -721,6 +721,21 @@ public class CoreAdminHandler extends Re
props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props));
+ // solrcloud_debug
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder =
+// core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+// + " synched "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Sync Failed");
}
@@ -750,8 +765,11 @@ public class CoreAdminHandler extends Re
String coreNodeName = params.get("coreNodeName");
String waitForState = params.get("state");
Boolean checkLive = params.getBool("checkLive");
+ Boolean onlyIfLeader = params.getBool("onlyIfLeader");
int pauseFor = params.getInt("pauseFor", 0);
+
+
String state = null;
boolean live = false;
int retry = 0;
@@ -764,6 +782,12 @@ public class CoreAdminHandler extends Re
+ cname);
}
if (core != null) {
+ if (onlyIfLeader != null && onlyIfLeader) {
+ if (!core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "We are not the leader");
+ }
+ }
+
// wait until we are sure the recovering node is ready
// to accept updates
CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Thu Aug 30 16:23:16 2012
@@ -166,6 +166,8 @@ public class SolrCmdDistributor {
addCommit(ureq, cmd);
+ log.info("Distrib commit to:" + nodes);
+
for (Node node : nodes) {
submit(ureq, node);
}
@@ -345,7 +347,8 @@ public class SolrCmdDistributor {
try {
semaphore.acquire();
} catch (InterruptedException e) {
- throw new RuntimeException();
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Update thread interrupted");
}
pending.add(completionService.submit(task));
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Aug 30 16:23:16 2012
@@ -183,15 +183,9 @@ public class DistributedUpdateProcessor
// set num nodes
numNodes = zkController.getClusterState().getLiveNodes().size();
- // the leader is...
- // TODO: if there is no leader, wait and look again
- // TODO: we are reading the leader from zk every time - we should cache
- // this and watch for changes?? Just pull it from ZkController cluster state probably?
String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash...
try {
- // TODO: if we find out we cannot talk to zk anymore, we should probably realize we are not
- // a leader anymore - we shouldn't accept updates at all??
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
collection, shardId));
@@ -201,7 +195,10 @@ public class DistributedUpdateProcessor
isLeader = coreNodeName.equals(leaderNodeName);
DistribPhase phase =
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+
+ doDefensiveChecks(shardId, phase);
+
if (DistribPhase.FROMLEADER == phase) {
// we are coming from the leader, just go local - add no urls
@@ -251,6 +248,36 @@ public class DistributedUpdateProcessor
return nodes;
}
+ private void doDefensiveChecks(String shardId, DistribPhase phase) {
+ String from = req.getParams().get("distrib.from");
+ boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
+ if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
+ log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Request says it is coming from leader, but we are the leader");
+ }
+
+ if (DistribPhase.FROMLEADER == phase && from != null) { // from will be null on log replay
+
+ ZkCoreNodeProps clusterStateLeader = new ZkCoreNodeProps(zkController
+ .getClusterState().getLeader(collection, shardId));
+
+ if (clusterStateLeader.getNodeProps() == null
+ || !clusterStateLeader.getCoreUrl().equals(from)) {
+ String coreUrl = null;
+ if (clusterStateLeader.getNodeProps() != null) {
+ coreUrl = clusterStateLeader.getCoreUrl();
+ }
+ log.error("We got a request from the leader, but it's not who our cluster state says is the leader :"
+ + req.getParamString()
+ + " : "
+ + coreUrl);
+
+ new SolrException(ErrorCode.BAD_REQUEST, "We got a request from the leader, but it's not who our cluster state says is the leader.");
+ }
+
+ }
+ }
+
private String getShard(int hash, String collection, ClusterState clusterState) {
// ranges should be part of the cloud state and eventually gotten from zk
@@ -329,6 +356,8 @@ public class DistributedUpdateProcessor
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
params.remove("commit"); // this will be distributed from the local commit
+ params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribAdd(cmd, nodes, params);
}
@@ -378,9 +407,11 @@ public class DistributedUpdateProcessor
// TODO: we should do this in the background it would seem
for (SolrCmdDistributor.Error error : response.errors) {
- if (error.node instanceof RetryNode) {
+ if (error.node instanceof RetryNode || error.e instanceof SolrException) {
// we don't try to force a leader to recover
// when we cannot forward to it
+ // and we assume SolrException means
+ // the node went down
continue;
}
// TODO: we should force their state to recovering ??
@@ -658,6 +689,10 @@ public class DistributedUpdateProcessor
(isLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
+ if (isLeader) {
+ params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ }
params.remove("commit"); // we already will have forwarded this from our local commit
cmdDistrib.distribDelete(cmd, nodes, params);
}
@@ -819,6 +854,8 @@ public class DistributedUpdateProcessor
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set("update.from", ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
cmdDistrib.distribDelete(cmd, replicas, params);
cmdDistrib.finish();
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Thu Aug 30 16:23:16 2012
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
- private static final int BASE_RUN_LENGTH = 45000;
+ private static final int BASE_RUN_LENGTH = 20000;
@BeforeClass
public static void beforeSuperClass() {
@@ -56,8 +56,8 @@ public class ChaosMonkeyNothingIsSafeTes
@Override
public void setUp() throws Exception {
super.setUp();
- // TODO use @Noisy annotation as we expect lots of exceptions
- //ignoreException(".*");
+ // can help to hide this when testing and looking at logs
+ //ignoreException("shard update error");
System.setProperty("numShards", Integer.toString(sliceCount));
}
@@ -71,8 +71,8 @@ public class ChaosMonkeyNothingIsSafeTes
public ChaosMonkeyNothingIsSafeTest() {
super();
- sliceCount = 3;
- shardCount = 12;
+ sliceCount = 1;
+ shardCount = 7;
}
@Override
@@ -83,9 +83,16 @@ public class ChaosMonkeyNothingIsSafeTes
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
+ // make sure we have leaders for each shard
+ for (int j = 1; j < sliceCount; j++) {
+ zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+ } // make sure we again have leaders for each shard
+
+ waitForRecoveriesToFinish(false);
+
// we cannot do delete by query
// as it's not supported for recovery
- // del("*:*");
+ del("*:*");
List<StopableThread> threads = new ArrayList<StopableThread>();
int threadCount = 1;
@@ -152,6 +159,7 @@ public class ChaosMonkeyNothingIsSafeTes
zkStateReader.updateClusterState(true);
assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0);
+
// we dont't current check vs control because the full throttle thread can
// have request fails
checkShardConsistency(false, true);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Thu Aug 30 16:23:16 2012
@@ -50,11 +50,6 @@ public class ChaosMonkeySafeLeaderTest e
@Override
public void setUp() throws Exception {
super.setUp();
- // we expect this time of exception as shards go up and down...
- //ignoreException(".*");
-
- // sometimes we cannot get the same port
- ignoreException("java\\.net\\.BindException: Address already in use");
System.setProperty("numShards", Integer.toString(sliceCount));
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Thu Aug 30 16:23:16 2012
@@ -121,7 +121,9 @@ public class CoreAdminRequest extends So
protected String state;
protected Boolean checkLive;
protected Integer pauseFor;
+ protected Boolean onlyIfLeader;
+
public WaitForState() {
action = CoreAdminAction.PREPRECOVERY;
}
@@ -166,6 +168,14 @@ public class CoreAdminRequest extends So
this.pauseFor = pauseFor;
}
+ public boolean isOnlyIfLeader() {
+ return onlyIfLeader;
+ }
+
+ public void setOnlyIfLeader(boolean onlyIfLeader) {
+ this.onlyIfLeader = onlyIfLeader;
+ }
+
@Override
public SolrParams getParams() {
if( action == null ) {
@@ -195,6 +205,10 @@ public class CoreAdminRequest extends So
if (pauseFor != null) {
params.set( "pauseFor", pauseFor);
}
+
+ if (onlyIfLeader != null) {
+ params.set( "onlyIfLeader", onlyIfLeader);
+ }
return params;
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Thu Aug 30 16:23:16 2012
@@ -38,6 +38,8 @@ class ConnectionManager implements Watch
private boolean connected;
private final ZkClientConnectionStrategy connectionStrategy;
+
+ private Object connectionUpdateLock = new Object();
private String zkServerAddress;
@@ -72,6 +74,7 @@ class ConnectionManager implements Watch
}
if (isClosed) {
+ log.info("Client->ZooKeeper status change trigger but we are already closed");
return;
}
@@ -79,28 +82,25 @@ class ConnectionManager implements Watch
if (state == KeeperState.SyncConnected) {
connected = true;
clientConnected.countDown();
+ connectionStrategy.connected();
} else if (state == KeeperState.Expired) {
connected = false;
- log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
-
+ log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper...");
+
try {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
public void update(SolrZooKeeper keeper) {
// if keeper does not replace oldKeeper we must be sure to close it
- synchronized (connectionStrategy) {
+ synchronized (connectionUpdateLock) {
try {
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
- } catch (InterruptedException e1) {
- closeKeeper(keeper);
- Thread.currentThread().interrupt();
- throw new RuntimeException("Giving up on connecting - we were interrupted", e1);
} catch (Exception e1) {
closeKeeper(keeper);
throw new RuntimeException(e1);
}
-
+ log.info("Connection with ZooKeeper reestablished.");
try {
client.updateKeeper(keeper);
} catch (InterruptedException e) {
@@ -129,7 +129,9 @@ class ConnectionManager implements Watch
}
log.info("Connected:" + connected);
} else if (state == KeeperState.Disconnected) {
+ log.info("zkClient has disconnected");
connected = false;
+ connectionStrategy.disconnected();
} else {
connected = false;
}
@@ -151,19 +153,26 @@ class ConnectionManager implements Watch
}
public synchronized void waitForConnected(long waitForConnection)
- throws InterruptedException, TimeoutException {
+ throws TimeoutException {
+ log.info("Waiting for client to connect to ZooKeeper");
long expire = System.currentTimeMillis() + waitForConnection;
long left = 1;
while (!connected && left > 0) {
if (isClosed) {
break;
}
- wait(500);
+ try {
+ wait(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
left = expire - System.currentTimeMillis();
}
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
}
+ log.info("Client is connected to ZooKeeper");
}
public synchronized void waitForDisconnected(long timeout)
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Thu Aug 30 16:23:16 2012
@@ -74,6 +74,7 @@ public class SolrZkClient {
private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
private volatile boolean isClosed = false;
+ private ZkClientConnectionStrategy zkClientConnectionStrategy;
/**
* @param zkServerAddress
@@ -116,6 +117,7 @@ public class SolrZkClient {
*/
public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) {
+ this.zkClientConnectionStrategy = strat;
connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
try {
@@ -135,29 +137,24 @@ public class SolrZkClient {
}
}
});
- } catch (IOException e) {
- connManager.close();
- throw new RuntimeException();
- } catch (InterruptedException e) {
- connManager.close();
- throw new RuntimeException();
- } catch (TimeoutException e) {
+ } catch (Throwable e) {
connManager.close();
throw new RuntimeException();
}
+
try {
connManager.waitForConnected(clientConnectTimeout);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- connManager.close();
- throw new RuntimeException();
- } catch (TimeoutException e) {
+ } catch (Throwable e) {
connManager.close();
throw new RuntimeException();
}
numOpens.incrementAndGet();
}
+ public ZkClientConnectionStrategy getZkClientConnectionStrategy() {
+ return zkClientConnectionStrategy;
+ }
+
/**
* @return true if client is connected
*/
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java Thu Aug 30 16:23:16 2012
@@ -18,18 +18,65 @@ package org.apache.solr.common.cloud;
*/
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.SolrException;
import org.apache.zookeeper.SolrZooKeeper;
import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
*/
public abstract class ZkClientConnectionStrategy {
+ private static Logger log = LoggerFactory.getLogger(ZkClientConnectionStrategy.class);
+
+ private List<DisconnectedListener> disconnectedListeners = new ArrayList<DisconnectedListener>();
+ private List<ConnectedListener> connectedListeners = new ArrayList<ConnectedListener>();
+
public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+ public synchronized void disconnected() {
+ for (DisconnectedListener listener : disconnectedListeners) {
+ try {
+ listener.disconnected();
+ } catch (Throwable t) {
+ SolrException.log(log, "", t);
+ }
+ }
+ }
+
+ public synchronized void connected() {
+ for (ConnectedListener listener : connectedListeners) {
+ try {
+ listener.connected();
+ } catch (Throwable t) {
+ SolrException.log(log, "", t);
+ }
+ }
+ }
+
+ public interface DisconnectedListener {
+ public void disconnected();
+ };
+
+ public interface ConnectedListener {
+ public void connected();
+ };
+
+
+ public synchronized void addDisconnectedListener(DisconnectedListener listener) {
+ disconnectedListeners.add(listener);
+ }
+
+ public synchronized void addConnectedListener(ConnectedListener listener) {
+ connectedListeners.add(listener);
+ }
+
public static abstract class ZkUpdate {
public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
}
Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Thu Aug 30 16:23:16 2012
@@ -801,7 +801,7 @@ public abstract class AbstractFullDistri
SolrDocumentList lst1 = lastJetty.client.solrClient.query(query).getResults();
SolrDocumentList lst2 = cjetty.client.solrClient.query(query).getResults();
- showDiff(lst1, lst2, lastJetty.toString(), cjetty.client.solrClient.toString());
+ showDiff(lst1, lst2, lastJetty.url, cjetty.url);
}
}
@@ -1130,7 +1130,8 @@ public abstract class AbstractFullDistri
try {
commit();
- } catch (Exception e) {
+ } catch (Throwable t) {
+ t.printStackTrace();
// we don't care if this commit fails on some nodes
}
@@ -1146,8 +1147,8 @@ public abstract class AbstractFullDistri
retry = true;
}
cnt++;
- if (cnt > 2) break;
- Thread.sleep(4000);
+ if (cnt > 4) break;
+ Thread.sleep(2000);
} while (retry);
}
Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java?rev=1379002&r1=1379001&r2=1379002&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java Thu Aug 30 16:23:16 2012
@@ -17,7 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import java.net.BindException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -174,16 +173,10 @@ public class ChaosMonkey {
public static void kill(CloudJettyRunner cjetty) throws Exception {
JettySolrRunner jetty = cjetty.jetty;
monkeyLog("kill shard! " + jetty.getLocalPort());
- FilterHolder fh = jetty.getDispatchFilter();
- SolrDispatchFilter sdf = null;
- if (fh != null) {
- sdf = (SolrDispatchFilter) fh.getFilter();
- }
+
jetty.stop();
- if (sdf != null) {
- sdf.destroy();
- }
+ stop(jetty);
if (!jetty.isStopped()) {
throw new RuntimeException("could not kill jetty");
@@ -441,6 +434,7 @@ public class ChaosMonkey {
}
public static boolean start(JettySolrRunner jetty) throws Exception {
+
try {
jetty.start();
} catch (Exception e) {
@@ -454,7 +448,7 @@ public class ChaosMonkey {
try {
jetty.start();
} catch (Exception e3) {
- log.error("", e3);
+ log.error("Could not get the port to start jetty again", e3);
// we coud not get the port
jetty.stop();
return false;