You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/04/11 21:23:01 UTC

[lucene-solr] branch master updated: SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when processing concurrent requests during shutdown.

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 980fd7d  SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when processing concurrent requests during shutdown.
980fd7d is described below

commit 980fd7d5c56349570b2202dedcf7b454216a61b8
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Thu Apr 11 14:22:50 2019 -0700

    SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when processing concurrent requests during shutdown.
    
    This primarily affected tests, but may have also caused odd errors/delays when restart/shutting down solr nodes.
---
 solr/CHANGES.txt                                   |  4 ++
 .../solrj/impl/ZkClientClusterStateProvider.java   | 64 ++++++++++++++--------
 .../apache/solr/common/cloud/ZkStateReader.java    |  9 ++-
 .../solrj/impl/TestCloudSolrClientConnections.java | 54 ++++++++++++++++++
 4 files changed, 105 insertions(+), 26 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f4eb469..916b897 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -174,6 +174,10 @@ Bug Fixes
 
 * SOLR-13339: Prevent recovery, fetching index being kicked off after SolrCores already closed (Cao Manh Dat)
 
+* SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when
+  processing concurrent requests during shutdown.  This primarily affected tests, but may have also caused
+  odd errors/delays when restart/shutting down solr nodes.  (hossman)
+
 Improvements
 ----------------------
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index e1f33a8..88411c7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -64,21 +64,17 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
 
   @Override
   public ClusterState.CollectionRef getState(String collection) {
-    ClusterState clusterState = zkStateReader.getClusterState();
+    ClusterState clusterState = getZkStateReader().getClusterState();
     if (clusterState != null) {
       return clusterState.getCollectionRef(collection);
     } else {
       return null;
     }
   }
-  public ZkStateReader getZkStateReader(){
-    return zkStateReader;
-  }
-
+  
   @Override
   public Set<String> getLiveNodes() {
-    if (isClosed) throw new AlreadyClosedException();
-    ClusterState clusterState = zkStateReader.getClusterState();
+    ClusterState clusterState = getZkStateReader().getClusterState();
     if (clusterState != null) {
       return clusterState.getLiveNodes();
     } else {
@@ -89,23 +85,23 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
 
   @Override
   public List<String> resolveAlias(String alias) {
-    return zkStateReader.getAliases().resolveAliases(alias); // if not an alias, returns itself
+    return getZkStateReader().getAliases().resolveAliases(alias); // if not an alias, returns itself
   }
 
   @Override
   public String resolveSimpleAlias(String alias) throws IllegalArgumentException {
-    return zkStateReader.getAliases().resolveSimpleAlias(alias);
+    return getZkStateReader().getAliases().resolveSimpleAlias(alias);
   }
 
   @Override
   public Object getClusterProperty(String propertyName) {
-    Map<String, Object> props = zkStateReader.getClusterProperties();
+    Map<String, Object> props = getZkStateReader().getClusterProperties();
     return props.get(propertyName);
   }
 
   @Override
   public <T> T getClusterProperty(String propertyName, T def) {
-    Map<String, Object> props = zkStateReader.getClusterProperties();
+    Map<String, Object> props = getZkStateReader().getClusterProperties();
     if (props.containsKey(propertyName)) {
       return (T)props.get(propertyName);
     }
@@ -114,12 +110,12 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
 
   @Override
   public ClusterState getClusterState() throws IOException {
-    return zkStateReader.getClusterState();
+    return getZkStateReader().getClusterState();
   }
 
   @Override
   public Map<String, Object> getClusterProperties() {
-    return zkStateReader.getClusterProperties();
+    return getZkStateReader().getClusterProperties();
   }
 
   @Override
@@ -135,8 +131,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
    * @throws IOException  if an I/O exception occurs
    */
   public void downloadConfig(String configName, Path downloadPath) throws IOException {
-    connect();
-    zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
+    getZkStateReader().getConfigManager().downloadConfigDir(configName, downloadPath);
   }
 
   /**
@@ -151,21 +146,31 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
    * @throws IOException if an IO error occurs
    */
   public void uploadConfig(Path configPath, String configName) throws IOException {
-    connect();
-    zkStateReader.getConfigManager().uploadConfigDir(configPath, configName);
+    getZkStateReader().getConfigManager().uploadConfigDir(configPath, configName);
   }
 
   @Override
   public void connect() {
+    // Esentially a No-Op, but force a check that we're not closed and the ZkStateReader is available...
+    final ZkStateReader ignored = getZkStateReader();
+  }
+  
+  public ZkStateReader getZkStateReader() {
+    if (isClosed) { // quick check...
+      throw new AlreadyClosedException();
+    }
     if (zkStateReader == null) {
       synchronized (this) {
+        if (isClosed) { // while we were waiting for sync lock another thread may have closed
+          throw new AlreadyClosedException();
+        }
         if (zkStateReader == null) {
           ZkStateReader zk = null;
           try {
             zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
             zk.createClusterStateWatchersAndUpdate();
-            zkStateReader = zk;
             log.info("Cluster at {} ready", zkHost);
+            zkStateReader = zk;
           } catch (InterruptedException e) {
             zk.close();
             Thread.currentThread().interrupt();
@@ -181,16 +186,22 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
         }
       }
     }
+    return zkStateReader;
   }
-
+  
   @Override
   public void close() throws IOException {
-    isClosed  = true;
-    if (zkStateReader != null && closeZkStateReader) {
-      synchronized (this) {
-        if (zkStateReader != null)
-          zkStateReader.close();
+    synchronized (this) {
+      if (false == isClosed && zkStateReader != null) {
+        isClosed = true;
+        
+        // force zkStateReader to null first so that any parallel calls drop into the synch block 
+        // getZkStateReader() as soon as possible.
+        final ZkStateReader zkToClose = zkStateReader;
         zkStateReader = null;
+        if (closeZkStateReader) {
+          zkToClose.close();
+        }
       }
     }
   }
@@ -230,4 +241,9 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
   public String toString() {
     return zkHost;
   }
+
+  @Override
+  public boolean isClosed() {
+    return isClosed;
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 37d6515..cd7e41c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -16,7 +16,6 @@
  */
 package org.apache.solr.common.cloud;
 
-import java.io.Closeable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,6 +46,7 @@ import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.Callable;
+import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.AutoScalingParams;
@@ -74,7 +74,7 @@ import static java.util.Collections.emptySortedSet;
 import static java.util.Collections.unmodifiableSet;
 import static org.apache.solr.common.util.Utils.fromJSON;
 
-public class ZkStateReader implements Closeable {
+public class ZkStateReader implements SolrCloseable {
   public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000);  // delay between cloud state updates
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
@@ -851,6 +851,11 @@ public class ZkStateReader implements Closeable {
     }
     assert ObjectReleaseTracker.release(this);
   }
+
+  @Override
+  public boolean isClosed() {
+    return closed;
+  }
   
   public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
     ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java
index 2e12022..8d9193f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/TestCloudSolrClientConnections.java
@@ -22,8 +22,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.junit.Test;
 
 public class TestCloudSolrClientConnections extends SolrTestCaseJ4 {
@@ -89,4 +91,56 @@ public class TestCloudSolrClientConnections extends SolrTestCaseJ4 {
     }
   }
 
+  @Test
+  public void testAlreadyClosedClusterStateProvider() throws Exception {
+    
+    final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(1, createTempDir(),
+                                                                  buildJettyConfig("/solr"));
+    // from a client perspective the behavior of ZkClientClusterStateProvider should be
+    // consistent regardless of wether it's constructed with a zkhost or an existing ZkStateReader
+    try {
+      final ZkClientClusterStateProvider zkHost_provider
+        = new ZkClientClusterStateProvider(cluster.getZkServer().getZkAddress());
+      
+      checkAndCloseProvider(zkHost_provider);
+      
+      final ZkStateReader reusedZkReader = new ZkStateReader(cluster.getZkClient());
+      try {
+        reusedZkReader.createClusterStateWatchersAndUpdate();
+        final ZkClientClusterStateProvider reader_provider = new ZkClientClusterStateProvider(reusedZkReader);
+        checkAndCloseProvider(reader_provider);
+        
+        // but in the case of a reused StateZkReader,
+        // closing the provider must not have closed the ZkStateReader...
+        assertEquals(false, reusedZkReader.isClosed());
+        
+      } finally {
+        reusedZkReader.close();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /** NOTE: will close the provider and assert it starts throwing AlreadyClosedException */
+  private void checkAndCloseProvider(final ZkClientClusterStateProvider provider) throws Exception {
+    if (random().nextBoolean()) {
+      // calling connect should be purely optional and affect nothing
+      provider.connect();
+    }
+    assertNotNull(provider.getClusterState());
+
+    provider.close();
+
+    if (random().nextBoolean()) {
+      expectThrows(AlreadyClosedException.class, () -> {
+          provider.connect();
+        });
+    }
+    expectThrows(AlreadyClosedException.class, () -> {
+        Object ignored = provider.getClusterState();
+      });
+    
+  }
+
 }