You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2019/06/03 16:11:32 UTC
[lucene-solr] branch branch_8x updated: SOLR-13489: Stop the leader
from trying to rejoin the election on session expiration and harden our zk
reconnect code path.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 96f0003 SOLR-13489: Stop the leader from trying to rejoin the election on session expiration and harden our zk reconnect code path.
96f0003 is described below
commit 96f000334ccacc211a6a277cfa7d283c4f80823c
Author: markrmiller <ma...@apache.org>
AuthorDate: Sun Jun 2 21:31:12 2019 -0500
SOLR-13489: Stop the leader from trying to rejoin the election on session expiration and harden our zk reconnect code path.
# Conflicts:
# solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
---
solr/CHANGES.txt | 3 ++
.../org/apache/solr/cloud/ElectionContext.java | 14 +++++-
.../java/org/apache/solr/cloud/LeaderElector.java | 1 +
.../java/org/apache/solr/cloud/ZkController.java | 52 +++++++++++++++-------
.../apache/solr/cloud/ConnectionManagerTest.java | 4 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 19 ++++----
.../impl/ConcurrentUpdateHttp2SolrClient.java | 2 +-
.../solr/common/cloud/ConnectionManager.java | 38 ++++++++--------
.../common/cloud/DefaultConnectionStrategy.java | 6 +--
.../org/apache/solr/common/cloud/OnReconnect.java | 4 +-
10 files changed, 90 insertions(+), 53 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3d22df1..a2d4d29 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -73,6 +73,9 @@ Bug Fixes
* SOLR-13491: SolrZkClient's watch wrapper no longer allows zookeeper to hold the same watch object multiple times.
+* SOLR-13489: Stop the leader from trying to rejoin the election on session expiration and harden our zk reconnect code path.
+ (Mark Miller, Anshum Gupta)
+
Other Changes
----------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index a67ce57..456daee 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -52,6 +52,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.OpResult.SetDataResult;
@@ -489,6 +490,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// we made it as leader - send any recovery requests we need to
syncStrategy.requestRecoveries();
+ } catch (SessionExpiredException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "ZK session expired - cancelling election for " + collection + " " + shardId);
} catch (Exception e) {
isLeader = false;
SolrException.log(log, "There was a problem trying to register as the leader", e);
@@ -503,7 +507,12 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
// we could not publish ourselves as leader - try and rejoin election
- rejoinLeaderElection(core);
+ try {
+ rejoinLeaderElection(core);
+ } catch (SessionExpiredException exc) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "ZK session expired - cancelling election for " + collection + " " + shardId);
+ }
}
}
} else {
@@ -719,6 +728,9 @@ final class OverseerElectionContext extends ElectionContext {
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
InterruptedException {
+ if (isClosed) {
+ return;
+ }
log.info("I am going to be the leader {}", id);
final String id = leaderSeqPath
.substring(leaderSeqPath.lastIndexOf("/") + 1);
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 0cc8cac..80ce82f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -133,6 +133,7 @@ public class LeaderElector {
if (leaderSeqNodeName.equals(seqs.get(0))) {
// I am the leader
try {
+ if (zkClient.isClosed()) return; // but our zkClient is already closed
runIamLeaderProcess(context, replacement);
} catch (KeeperException.NodeExistsException e) {
log.error("node exists",e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 7c0bb97..1814d55 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -335,10 +335,11 @@ public class ZkController implements Closeable {
new OnReconnect() {
@Override
- public void command() {
+ public void command() throws SessionExpiredException {
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
clearZkCollectionTerms();
try {
+ // recreate our watchers first so that they exist even on any problems below
zkStateReader.createClusterStateWatchersAndUpdate();
// this is troublesome - we dont want to kill anything the old
@@ -368,8 +369,17 @@ public class ZkController implements Closeable {
}
cc.cancelCoreRecoveries();
-
- registerAllCoresAsDown(registerOnReconnect, false);
+
+ try {
+ registerAllCoresAsDown(registerOnReconnect, false);
+ } catch (SessionExpiredException e) {
+ // zk has to reconnect and this will all be tried again
+ throw e;
+ } catch (Exception e) {
+ // this is really best effort - in case of races or failure cases where we now need to be the leader, if anything fails,
+ // just continue
+ log.warn("Exception while trying to register all cores as DOWN", e);
+ }
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();
@@ -422,6 +432,8 @@ public class ZkController implements Closeable {
Thread.currentThread().interrupt();
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (SessionExpiredException e) {
+ throw e;
} catch (Exception e) {
SolrException.log(log, "", e);
throw new ZooKeeperException(
@@ -478,7 +490,7 @@ public class ZkController implements Closeable {
}
private void registerAllCoresAsDown(
- final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) {
+ final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) throws SessionExpiredException {
List<CoreDescriptor> descriptors = registerOnReconnect
.getCurrentDescriptors();
if (isClosed) return;
@@ -509,6 +521,9 @@ public class ZkController implements Closeable {
continue;
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
+ } catch (SessionExpiredException e) {
+ // zk has to reconnect
+ throw e;
} catch (KeeperException e) {
log.warn("", e);
Thread.currentThread().interrupt();
@@ -519,7 +534,7 @@ public class ZkController implements Closeable {
log.debug("calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", new Object[]{coreZkNodeName, collection, slice});
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
- SolrException.log(log, "", e);
+ log.warn("There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration", e);
if (isClosed) {
return;
}
@@ -1384,19 +1399,21 @@ public class ZkController implements Closeable {
/**
* Get leader props directly from zk nodes.
+ * @throws SessionExpiredException on zk session expiration.
*/
public ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice, int timeoutms) throws InterruptedException {
- return getLeaderProps(collection, slice, timeoutms, false);
+ final String slice, int timeoutms) throws InterruptedException, SessionExpiredException {
+ return getLeaderProps(collection, slice, timeoutms, true);
}
/**
* Get leader props directly from zk nodes.
*
* @return leader props
+ * @throws SessionExpiredException on zk session expiration.
*/
public ZkCoreNodeProps getLeaderProps(final String collection,
- final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
+ final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException, SessionExpiredException {
int iterCount = timeoutms / 1000;
Exception exp = null;
while (iterCount-- > 0) {
@@ -1411,7 +1428,7 @@ public class ZkController implements Closeable {
throw e;
} catch (SessionExpiredException e) {
if (failImmediatelyOnExpiration) {
- throw new RuntimeException("Session has expired - could not get leader props", exp);
+ throw e;
}
exp = e;
Thread.sleep(1000);
@@ -1861,7 +1878,7 @@ public class ZkController implements Closeable {
}
private ZkCoreNodeProps waitForLeaderToSeeDownState(
- CoreDescriptor descriptor, final String coreZkNodeName) {
+ CoreDescriptor descriptor, final String coreZkNodeName) throws SessionExpiredException {
// try not to wait too long here - if we are waiting too long, we should probably
// move along and join the election
@@ -1881,8 +1898,10 @@ public class ZkController implements Closeable {
// go straight to zk, not the cloud state - we want current info
leaderProps = getLeaderProps(collection, shard, 5000);
break;
+ } catch (SessionExpiredException e) {
+ throw e;
} catch (Exception e) {
- SolrException.log(log, "There was a problem finding the leader in zk", e);
+ log.info("Did not find the leader in Zookeeper", e);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
@@ -1908,11 +1927,11 @@ public class ZkController implements Closeable {
" is already less than leader, so not waiting for leader to see down state.");
} else {
- log.info("Replica need to wait for leader to see down state.");
+ log.info("replica={} is making a best effort attempt to wait for leader={} to see it's DOWN state.", myCoreNodeName, leaderProps.getCoreUrl());
try (HttpSolrClient client = new Builder(leaderBaseUrl)
- .withConnectionTimeout(15000)
- .withSocketTimeout(120000)
+ .withConnectionTimeout(8000) // short timeouts, we may be in a storm and this is best effort and maybe we should be the leader now
+ .withSocketTimeout(30000)
.build()) {
WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
@@ -1920,9 +1939,8 @@ public class ZkController implements Closeable {
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(Replica.State.DOWN);
- // let's retry a couple times - perhaps the leader just went down,
- // or perhaps he is just not quite ready for us yet
- retries = 2;
+ // lets give it another chance, but without taking too long
+ retries = 3;
for (int i = 0; i < retries; i++) {
if (isClosed) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
diff --git a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
index 6684d34..3fa2108 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
@@ -19,6 +19,8 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.ConnectionManager;
@@ -150,7 +152,7 @@ public class ConnectionManagerTest extends SolrTestCaseJ4 {
@Override
public void reconnect(final String serverAddress, final int zkClientTimeout,
- final Watcher watcher, final ZkUpdate updater) throws IOException {
+ final Watcher watcher, final ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
if(called++ < 1) {
exceptionThrown = true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 82938a0..5a2bbb9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -83,6 +83,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.data.Stat;
@@ -914,7 +915,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
mockController = new MockZKController(server.getZkAddress(), "node1", overseers);
- TimeOut timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
try {
mockController.createCollection(COLLECTION, 1);
@@ -924,7 +925,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
- timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
try {
mockController.publishState(COLLECTION, "core1", "node1", "shard1", Replica.State.ACTIVE,
@@ -954,8 +955,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
mockController2 = new MockZKController(server.getZkAddress(), "node2", overseers);
-
- timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+ timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
try {
mockController.publishState(COLLECTION, "core1", "node1", "shard1", Replica.State.ACTIVE,
@@ -969,7 +970,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
verifyShardLeader(reader, COLLECTION, "shard1", "core1");
- timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeout.hasTimedOut()) {
try {
mockController2.publishState(COLLECTION, "core4", "node2", "shard1", Replica.State.ACTIVE,
@@ -992,11 +993,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkCoreNodeProps leaderProps;
try {
- leaderProps = zkController.getLeaderProps(COLLECTION, "shard1", 1000);
- } catch (SolrException e) {
+ leaderProps = zkController.getLeaderProps(COLLECTION, "shard1", 1000, false);
+ } catch (SolrException e) {
return false;
} catch (InterruptedException e) {
throw new RuntimeException(e);
+ } catch (SessionExpiredException e) {
+ return false;
}
if (leaderProps.getCoreName().equals("core4")) {
return true;
@@ -1428,7 +1431,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
return zkClient;
}
- private ZkController createMockZkController(String zkAddress, SolrZkClient zkClient, ZkStateReader reader) throws InterruptedException, NoSuchFieldException, SecurityException {
+ private ZkController createMockZkController(String zkAddress, SolrZkClient zkClient, ZkStateReader reader) throws InterruptedException, NoSuchFieldException, SecurityException, SessionExpiredException {
ZkController zkController = mock(ZkController.class);
if (zkClient == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
index ac7449b..fb2af94 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -94,7 +94,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
boolean success = available.tryAcquire(timeout, unit);
if (success) {
- queue.offer(e);
+ queue.offer(e, timeout, unit);
}
return success;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 5f93e30..23871a7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -155,27 +155,27 @@ public class ConnectionManager implements Watcher {
public void update(SolrZooKeeper keeper) {
try {
waitForConnected(Long.MAX_VALUE);
+
+ try {
+ client.updateKeeper(keeper);
+ } catch (InterruptedException e) {
+ closeKeeper(keeper);
+ Thread.currentThread().interrupt();
+ // we must have been asked to stop
+ throw new RuntimeException(e);
+ }
+
+ if (onReconnect != null) {
+ onReconnect.command();
+ }
+
} catch (Exception e1) {
+ // if there was a problem creating the new SolrZooKeeper
+ // or if we cannot run our reconnect command, close the keeper
+ // our retry loop will try to create one again
closeKeeper(keeper);
throw new RuntimeException(e1);
}
-
- log.info("Connection with ZooKeeper reestablished.");
- try {
- client.updateKeeper(keeper);
- } catch (InterruptedException e) {
- closeKeeper(keeper);
- Thread.currentThread().interrupt();
- // we must have been asked to stop
- throw new RuntimeException(e);
- } catch (Exception t) {
- closeKeeper(keeper);
- throw new RuntimeException(t);
- }
-
- if (onReconnect != null) {
- onReconnect.command();
- }
}
});
@@ -231,7 +231,7 @@ public class ConnectionManager implements Watcher {
public synchronized void waitForConnected(long waitForConnection)
throws TimeoutException {
- log.debug("Waiting for client to connect to ZooKeeper");
+ log.info("Waiting for client to connect to ZooKeeper");
long expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(waitForConnection, TimeUnit.MILLISECONDS);
long left = 1;
while (!connected && left > 0) {
@@ -249,7 +249,7 @@ public class ConnectionManager implements Watcher {
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
}
- log.debug("Client is connected to ZooKeeper");
+ log.info("Client is connected to ZooKeeper");
}
public synchronized void waitForDisconnected(long timeout)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
index 2ed88e2..85c4b11 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
@@ -21,7 +21,6 @@ import java.lang.invoke.MethodHandles;
import java.util.concurrent.TimeoutException;
import org.apache.solr.common.AlreadyClosedException;
-import org.apache.solr.common.SolrException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +48,7 @@ public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
@Override
public void reconnect(final String serverAddress, final int zkClientTimeout,
- final Watcher watcher, final ZkUpdate updater) throws IOException {
+ final Watcher watcher, final ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
log.warn("Connection expired - starting a new one...");
SolrZooKeeper zk = createSolrZooKeeper(serverAddress, zkClientTimeout, watcher);
boolean success = false;
@@ -60,9 +59,6 @@ public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
log.info("Reconnected to ZooKeeper");
} catch (AlreadyClosedException e) {
- } catch (Exception e) {
- SolrException.log(log, "Reconnect to ZooKeeper failed", e);
- log.warn("Reconnect to ZooKeeper failed");
} finally {
if (!success) {
try {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 46aed08..4c86374 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -16,6 +16,8 @@
*/
package org.apache.solr.common.cloud;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
/**
* Implementations are expected to implement a correct hashCode and equals
* method needed to uniquely identify the listener as listeners are managed
@@ -24,5 +26,5 @@ package org.apache.solr.common.cloud;
* when it no longer needs to be notified of ZK reconnection events.
*/
public interface OnReconnect {
- void command();
+ void command() throws SessionExpiredException;
}