You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/15 03:17:09 UTC
[lucene-solr] branch reference_impl updated: #169 - Keep tweaking
on zk connection manager.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl by this push:
new ecaefd9 #169 - Keep tweaking on zk connection manager.
ecaefd9 is described below
commit ecaefd907ee304ef7a676b008c48180d88aabead
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Jul 14 22:16:52 2020 -0500
#169 - Keep tweaking on zk connection manager.
---
.../apache/solr/cloud/ConnectionManagerTest.java | 2 +-
.../solr/common/cloud/ConnectionManager.java | 83 ++++++----------------
.../org/apache/solr/common/cloud/SolrZkClient.java | 19 +++--
.../java/org/apache/solr/cloud/ZkTestServer.java | 17 ++---
4 files changed, 43 insertions(+), 78 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
index b2bf6d1..74fcb3a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
@@ -130,7 +130,7 @@ public class ConnectionManagerTest extends SolrTestCaseJ4 {
// reconnect -- should no longer be likely expired
cm.process(new WatchedEvent(EventType.None, KeeperState.Expired, ""));
- assertFalse(cm.isLikelyExpired());
+
assertTrue(cm.isConnectedAndNotClosed());
assertTrue(strat.isExceptionThrow());
} finally {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 7c30c84..86ad08c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -27,6 +27,7 @@ import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.TimeOut;
import org.apache.solr.common.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -108,15 +109,15 @@ public class ConnectionManager implements Watcher, Closeable {
this.isClosedCheck = isClosed;
}
- private synchronized void connected() {
+ private void connected() {
connected = true;
likelyExpiredState = LikelyExpiredState.NOT_EXPIRED;
+ log.info("Connected, notify any wait");
connectedLatch.countDown();
disconnectedLatch = new CountDownLatch(1);
- notifyAll();
}
- private synchronized void disconnected() {
+ private void disconnected() {
connected = false;
// record the time we expired unless we are already likely expired
if (!likelyExpiredState.isLikelyExpired(0)) {
@@ -124,7 +125,6 @@ public class ConnectionManager implements Watcher, Closeable {
}
disconnectedLatch.countDown();;
connectedLatch = new CountDownLatch(1);
- notifyAll();
}
@Override
@@ -161,15 +161,8 @@ public class ConnectionManager implements Watcher, Closeable {
if (beforeReconnect != null) {
try {
beforeReconnect.command();
- } catch (Exception e) {
-
- if (e instanceof InterruptedException) {
- // does not currently throw InterruptedException
- // but could change
- ParWork.propegateInterrupt(e);
- }
-
- log.warn("Exception running beforeReconnect command", e);
+ } catch (Exception e) {
+ ParWork.propegateInterrupt("Exception running beforeReconnect command", e);
}
}
@@ -183,7 +176,7 @@ public class ConnectionManager implements Watcher, Closeable {
@Override
public void update(SolrZooKeeper keeper) {
try {
- waitForConnected(Long.MAX_VALUE);
+ waitForConnected(1000);
try {
client.updateKeeper(keeper);
@@ -199,11 +192,6 @@ public class ConnectionManager implements Watcher, Closeable {
exp.addSuppressed(e1);
}
- if (Thread.currentThread().isInterrupted()) {
- Thread.currentThread().interrupt();
- return;
- }
-
throw exp;
}
@@ -226,11 +214,6 @@ public class ConnectionManager implements Watcher, Closeable {
exp.addSuppressed(e);
}
- if (Thread.currentThread().isInterrupted()) {
- Thread.currentThread().interrupt();
- return;
- }
-
throw exp;
}
@@ -248,8 +231,7 @@ public class ConnectionManager implements Watcher, Closeable {
return;
}
SolrException.log(log, "", e);
- log.info("Could not connect due to error, sleeping for 1s and trying again");
- waitSleep(500);
+ log.info("Could not connect due to error, trying again");
}
} while (!isClosed());
@@ -264,11 +246,11 @@ public class ConnectionManager implements Watcher, Closeable {
}
}
- public synchronized boolean isConnectedAndNotClosed() {
+ public boolean isConnectedAndNotClosed() {
return !isClosed() && connected;
}
- public synchronized boolean isConnected() {
+ public boolean isConnected() {
return connected;
}
@@ -287,47 +269,28 @@ public class ConnectionManager implements Watcher, Closeable {
return isClosed() || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90));
}
- public synchronized void waitSleep(long waitFor) {
- try {
- wait(waitFor);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public synchronized void waitForConnected(long waitForConnection)
+ public void waitForConnected(long waitForConnection)
throws TimeoutException, InterruptedException {
log.info("Waiting for client to connect to ZooKeeper");
- TimeOut timeout = new TimeOut(waitForConnection, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
- boolean success = false;
- while (!success) {
- if (client.isConnected()) {
- connected();
- break;
- }
- if (timeout.hasTimedOut()) {
- throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
- }
- success = connectedLatch.await(250, TimeUnit.MILLISECONDS);
+
+ if (client.isConnected()) return;
+ boolean success = connectedLatch.await(waitForConnection, TimeUnit.MILLISECONDS);
+ if (client.isConnected()) return;
+ if (!success) {
+ throw new TimeoutException("Timeout waiting to connect to ZooKeeper");
}
log.info("Client is connected to ZooKeeper");
}
- public synchronized void waitForDisconnected(long waitForDisconnected)
+ public void waitForDisconnected(long waitForDisconnected)
throws InterruptedException, TimeoutException {
- TimeOut timeout = new TimeOut(waitForDisconnected, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
- boolean success = false;
- while (!success) {
- if (client.isConnected()) {
- connected();
- break;
- }
- if (timeout.hasTimedOut()) {
- throw new TimeoutException("Timeout waiting to disconnect from ZooKeeper " + zkServerAddress + " within " + waitForDisconnected + " ms");
- }
- success = disconnectedLatch.await(250, TimeUnit.MILLISECONDS);
+ if (!client.isConnected()) return;
+ boolean success = disconnectedLatch.await(1000, TimeUnit.MILLISECONDS);
+ if (!success) {
+ throw new TimeoutException("Timeout waiting to disconnect from ZooKeeper");
}
+
}
private void closeKeeper(SolrZooKeeper keeper) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 17f1637..9ece64c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -239,9 +239,11 @@ public class SolrZkClient implements Closeable {
}
});
} catch (Exception e) {
- connManager.close();
- if (keeper != null) {
- keeper.close();
+ try (ParWork closer = new ParWork(this, true)) {
+ closer.collect(zkConnManagerCallbackExecutor);
+ closer.collect(zkCallbackExecutor);
+ closer.collect(connManager);
+ closer.collect(keeper);
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
@@ -249,9 +251,13 @@ public class SolrZkClient implements Closeable {
try {
connManager.waitForConnected(clientConnectTimeout);
} catch (Exception e) {
- connManager.close();
- keeper.close();
- zkConnManagerCallbackExecutor.shutdown();
+ ParWork.propegateInterrupt(e);
+ try (ParWork closer = new ParWork(this, true)) {
+ closer.collect(zkConnManagerCallbackExecutor);
+ closer.collect(zkCallbackExecutor);
+ closer.collect(connManager);
+ closer.collect(keeper);
+ }
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
if (zkACLProvider == null) {
@@ -912,7 +918,6 @@ public class SolrZkClient implements Closeable {
try (ParWork worker = new ParWork(this, true)) {
worker.add("ZkClientExecutors&ConnMgr", zkCallbackExecutor, zkConnManagerCallbackExecutor, connManager, keeper);
- //worker.add("keeper", keeper);
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 6959866..3f7ff92 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -42,6 +42,7 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -119,7 +120,7 @@ public class ZkTestServer implements Closeable {
protected volatile SolrZkClient chRootClient;
- public final AtomicBoolean startupWait = new AtomicBoolean(false);
+ public volatile CountDownLatch startupWait = new CountDownLatch(1);
static public enum LimitViolationAction {
IGNORE, REPORT, FAIL,
@@ -361,10 +362,7 @@ public class ZkTestServer implements Closeable {
config.getMaxClientCnxns());
cnxnFactory.startup(zooKeeperServer);
- startupWait.set(true);
- synchronized (startupWait) {
- startupWait.notifyAll();
- }
+ startupWait.countDown();
log.info("ZK Port:" + zooKeeperServer.getClientPort());
cnxnFactory.join();
@@ -548,7 +546,6 @@ public class ZkTestServer implements Closeable {
public void run(boolean solrFormat) throws InterruptedException, IOException {
log.info("STARTING ZK TEST SERVER dataDir={}", this.zkDir);
-
// docs say no config for netty yet
// System.setProperty("zookeeper.serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
// System.setProperty("zookeeper.clientCnxnSocket", "org.apache.zookeeper.ClientCnxnSocketNetty");
@@ -601,10 +598,9 @@ public class ZkTestServer implements Closeable {
zooThread.start();
- synchronized (startupWait) {
- while (!startupWait.get()) {
- startupWait.wait(10000);
- }
+ boolean success = startupWait.await(5, TimeUnit.SECONDS);
+ if (!success) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for zk test server to start");
}
init(solrFormat);
@@ -653,6 +649,7 @@ public class ZkTestServer implements Closeable {
if (zooThread != null) {
ObjectReleaseTracker.release(zooThread);
}
+ startupWait = new CountDownLatch(1);
zooThread = null;
ObjectReleaseTracker.release(this);