You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/04/11 21:23:01 UTC
[lucene-solr] branch master updated: SOLR-13393: Fixed
ZkClientClusterStateProvider to prevent risk of leaking
ZkStateReader/threads when processing concurrent requests during shutdown.
This is an automated email from the ASF dual-hosted git repository.
hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 980fd7d SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when processing concurrent requests during shutdown.
980fd7d is described below
commit 980fd7d5c56349570b2202dedcf7b454216a61b8
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Thu Apr 11 14:22:50 2019 -0700
SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when processing concurrent requests during shutdown.
This primarily affected tests, but may have also caused odd errors/delays when restart/shutting down solr nodes.
---
solr/CHANGES.txt | 4 ++
.../solrj/impl/ZkClientClusterStateProvider.java | 64 ++++++++++++++--------
.../apache/solr/common/cloud/ZkStateReader.java | 9 ++-
.../solrj/impl/TestCloudSolrClientConnections.java | 54 ++++++++++++++++++
4 files changed, 105 insertions(+), 26 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f4eb469..916b897 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -174,6 +174,10 @@ Bug Fixes
* SOLR-13339: Prevent recovery, fetching index being kicked off after SolrCores already closed (Cao Manh Dat)
+* SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when
+ processing concurrent requests during shutdown. This primarily affected tests, but may have also caused
+ odd errors/delays when restart/shutting down solr nodes. (hossman)
+
Improvements
----------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index e1f33a8..88411c7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -64,21 +64,17 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
@Override
public ClusterState.CollectionRef getState(String collection) {
- ClusterState clusterState = zkStateReader.getClusterState();
+ ClusterState clusterState = getZkStateReader().getClusterState();
if (clusterState != null) {
return clusterState.getCollectionRef(collection);
} else {
return null;
}
}
- public ZkStateReader getZkStateReader(){
- return zkStateReader;
- }
-
+
@Override
public Set<String> getLiveNodes() {
- if (isClosed) throw new AlreadyClosedException();
- ClusterState clusterState = zkStateReader.getClusterState();
+ ClusterState clusterState = getZkStateReader().getClusterState();
if (clusterState != null) {
return clusterState.getLiveNodes();
} else {
@@ -89,23 +85,23 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
@Override
public List<String> resolveAlias(String alias) {
- return zkStateReader.getAliases().resolveAliases(alias); // if not an alias, returns itself
+ return getZkStateReader().getAliases().resolveAliases(alias); // if not an alias, returns itself
}
@Override
public String resolveSimpleAlias(String alias) throws IllegalArgumentException {
- return zkStateReader.getAliases().resolveSimpleAlias(alias);
+ return getZkStateReader().getAliases().resolveSimpleAlias(alias);
}
@Override
public Object getClusterProperty(String propertyName) {
- Map<String, Object> props = zkStateReader.getClusterProperties();
+ Map<String, Object> props = getZkStateReader().getClusterProperties();
return props.get(propertyName);
}
@Override
public <T> T getClusterProperty(String propertyName, T def) {
- Map<String, Object> props = zkStateReader.getClusterProperties();
+ Map<String, Object> props = getZkStateReader().getClusterProperties();
if (props.containsKey(propertyName)) {
return (T)props.get(propertyName);
}
@@ -114,12 +110,12 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
@Override
public ClusterState getClusterState() throws IOException {
- return zkStateReader.getClusterState();
+ return getZkStateReader().getClusterState();
}
@Override
public Map<String, Object> getClusterProperties() {
- return zkStateReader.getClusterProperties();
+ return getZkStateReader().getClusterProperties();
}
@Override
@@ -135,8 +131,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
* @throws IOException if an I/O exception occurs
*/
public void downloadConfig(String configName, Path downloadPath) throws IOException {
- connect();
- zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
+ getZkStateReader().getConfigManager().downloadConfigDir(configName, downloadPath);
}
/**
@@ -151,21 +146,31 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
* @throws IOException if an IO error occurs
*/
public void uploadConfig(Path configPath, String configName) throws IOException {
- connect();
- zkStateReader.getConfigManager().uploadConfigDir(configPath, configName);
+ getZkStateReader().getConfigManager().uploadConfigDir(configPath, configName);
}
@Override
public void connect() {
+ // Esentially a No-Op, but force a check that we're not closed and the ZkStateReader is available...
+ final ZkStateReader ignored = getZkStateReader();
+ }
+
+ public ZkStateReader getZkStateReader() {
+ if (isClosed) { // quick check...
+ throw new AlreadyClosedException();
+ }
if (zkStateReader == null) {
synchronized (this) {
+ if (isClosed) { // while we were waiting for sync lock another thread may have closed
+ throw new AlreadyClosedException();
+ }
if (zkStateReader == null) {
ZkStateReader zk = null;
try {
zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
- zkStateReader = zk;
log.info("Cluster at {} ready", zkHost);
+ zkStateReader = zk;
} catch (InterruptedException e) {
zk.close();
Thread.currentThread().interrupt();
@@ -181,16 +186,22 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
}
}
}
+ return zkStateReader;
}
-
+
@Override
public void close() throws IOException {
- isClosed = true;
- if (zkStateReader != null && closeZkStateReader) {
- synchronized (this) {
- if (zkStateReader != null)
- zkStateReader.close();
+ synchronized (this) {
+ if (false == isClosed && zkStateReader != null) {
+ isClosed = true;
+
+ // force zkStateReader to null first so that any parallel calls drop into the synch block
+ // getZkStateReader() as soon as possible.
+ final ZkStateReader zkToClose = zkStateReader;
zkStateReader = null;
+ if (closeZkStateReader) {
+ zkToClose.close();
+ }
}
}
}
@@ -230,4 +241,9 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
public String toString() {
return zkHost;
}
+
+ @Override
+ public boolean isClosed() {
+ return isClosed;
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 37d6515..cd7e41c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -16,7 +16,6 @@
*/
package org.apache.solr.common.cloud;
-import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
@@ -47,6 +46,7 @@ import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.Callable;
+import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.AutoScalingParams;
@@ -74,7 +74,7 @@ import static java.util.Collections.emptySortedSet;
import static java.util.Collections.unmodifiableSet;
import static org.apache.solr.common.util.Utils.fromJSON;
-public class ZkStateReader implements Closeable {
+public class ZkStateReader implements SolrCloseable {
public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -851,6 +851,11 @@ public class ZkStateReader implements Closeable {
}
assert ObjectReleaseTracker.release(this);
}
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java
index 2e12022..8d9193f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java
@@ -22,8 +22,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.junit.Test;
public class TestCloudSolrClientConnections extends SolrTestCaseJ4 {
@@ -89,4 +91,56 @@ public class TestCloudSolrClientConnections extends SolrTestCaseJ4 {
}
}
+ @Test
+ public void testAlreadyClosedClusterStateProvider() throws Exception {
+
+ final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(1, createTempDir(),
+ buildJettyConfig("/solr"));
+ // from a client perspective the behavior of ZkClientClusterStateProvider should be
+ // consistent regardless of wether it's constructed with a zkhost or an existing ZkStateReader
+ try {
+ final ZkClientClusterStateProvider zkHost_provider
+ = new ZkClientClusterStateProvider(cluster.getZkServer().getZkAddress());
+
+ checkAndCloseProvider(zkHost_provider);
+
+ final ZkStateReader reusedZkReader = new ZkStateReader(cluster.getZkClient());
+ try {
+ reusedZkReader.createClusterStateWatchersAndUpdate();
+ final ZkClientClusterStateProvider reader_provider = new ZkClientClusterStateProvider(reusedZkReader);
+ checkAndCloseProvider(reader_provider);
+
+ // but in the case of a reused StateZkReader,
+ // closing the provider must not have closed the ZkStateReader...
+ assertEquals(false, reusedZkReader.isClosed());
+
+ } finally {
+ reusedZkReader.close();
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /** NOTE: will close the provider and assert it starts throwing AlreadyClosedException */
+ private void checkAndCloseProvider(final ZkClientClusterStateProvider provider) throws Exception {
+ if (random().nextBoolean()) {
+ // calling connect should be purely optional and affect nothing
+ provider.connect();
+ }
+ assertNotNull(provider.getClusterState());
+
+ provider.close();
+
+ if (random().nextBoolean()) {
+ expectThrows(AlreadyClosedException.class, () -> {
+ provider.connect();
+ });
+ }
+ expectThrows(AlreadyClosedException.class, () -> {
+ Object ignored = provider.getClusterState();
+ });
+
+ }
+
}