You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/10 01:43:51 UTC

[lucene-solr] 07/09: @1334 More cleanup and correctness.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 1d4378959d9158bc92c480ddf8f7c11b1c096c5e
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Feb 4 19:11:22 2021 -0600

    @1334 More cleanup and correctness.
    
    # Conflicts:
    #	solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
---
 .../client/solrj/embedded/JettySolrRunner.java     |  19 +-
 .../java/org/apache/solr/cloud/LeaderElector.java  |   4 +-
 .../OverseerCollectionMessageHandler.java          | 100 ++--
 .../src/java/org/apache/solr/core/SolrCore.java    |  25 +
 .../org/apache/solr/handler/SchemaHandler.java     |   4 +-
 .../solr/schema/ManagedIndexSchemaFactory.java     |  23 +-
 .../apache/solr/schema/ZkIndexSchemaReader.java    |  60 +--
 .../org/apache/solr/update/SolrCmdDistributor.java |   4 -
 .../org/apache/solr/update/UpdateShardHandler.java |   2 +-
 .../AddSchemaFieldsUpdateProcessorFactory.java     |   8 +-
 .../solr/handler/component/SearchHandlerTest.java  |   2 +-
 .../org/apache/solr/schema/SchemaWatcherTest.java  |  50 --
 .../test/org/apache/solr/util/AuthToolTest.java    |   2 +
 .../org/apache/solr/common/ParWorkExecutor.java    |   2 +-
 .../solr/common/cloud/ConnectionManager.java       |  12 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |   3 +-
 .../apache/solr/common/cloud/SolrZooKeeper.java    |   2 +-
 .../apache/solr/common/cloud/ZkCmdExecutor.java    |   5 +-
 .../apache/solr/common/cloud/ZkStateReader.java    | 508 ++++++++++++---------
 .../java/org/apache/solr/cloud/ZkTestServer.java   |   2 +-
 20 files changed, 441 insertions(+), 396 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 34faa29..ad6b18c 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -25,12 +25,12 @@ import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrQueuedThreadPool;
 import org.apache.solr.common.util.SolrScheduledExecutorScheduler;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.servlet.SolrDispatchFilter;
-import org.apache.solr.servlet.SolrQoSFilter;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -573,7 +573,7 @@ public class JettySolrRunner implements Closeable {
         SolrZkClient zkClient = getCoreContainer().getZkController().getZkClient();
         CountDownLatch latch = new CountDownLatch(1);
 
-        Watcher watcher = new ClusterReadyWatcher(latch, zkClient);
+        ClusterReadyWatcher watcher = new ClusterReadyWatcher(latch, zkClient);
         try {
           Stat stat = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, watcher);
           if (stat == null) {
@@ -594,6 +594,8 @@ public class JettySolrRunner implements Closeable {
         } catch (InterruptedException e) {
           ParWork.propagateInterrupt(e);
           throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
+        } finally {
+          IOUtils.closeQuietly(watcher);
         }
 // if we need this, us client, not reader
 //        log.info("waitForNode: {}", getNodeName());
@@ -927,7 +929,7 @@ public class JettySolrRunner implements Closeable {
     return proxy;
   }
 
-  private static class ClusterReadyWatcher implements Watcher {
+  private static class ClusterReadyWatcher implements Watcher, Closeable {
 
     private final CountDownLatch latch;
     private final SolrZkClient zkClient;
@@ -959,5 +961,16 @@ public class JettySolrRunner implements Closeable {
         }
       }
     }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        zkClient.getSolrZooKeeper().removeWatches(ZkStateReader.COLLECTIONS_ZKNODE, this, WatcherType.Any, true);
+      } catch (KeeperException.NoWatcherException e) {
+
+      } catch (Exception e) {
+        if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+      }
+    }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 877f904..93d7fd2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -521,6 +521,7 @@ public class LeaderElector implements Closeable {
   private class ElectionWatcher implements Watcher, Closeable {
     final String myNode, watchedNode;
     final ElectionContext context;
+    private volatile boolean closed;
 
     private ElectionWatcher(String myNode, String watchedNode, ElectionContext context) {
       this.myNode = myNode;
@@ -531,7 +532,7 @@ public class LeaderElector implements Closeable {
     @Override
     public void process(WatchedEvent event) {
       // session events are not change events, and do not remove the watcher
-      if (EventType.None.equals(event.getType())) {
+      if (EventType.None.equals(event.getType()) || closed) {
         return;
       }
 
@@ -577,6 +578,7 @@ public class LeaderElector implements Closeable {
 
     @Override
     public void close() throws IOException {
+      this.closed = true;
       SolrZooKeeper zk = zkClient.getSolrZooKeeper();
       try {
         zk.removeWatches(watchedNode, this, WatcherType.Any, true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 56fdb00..e495380 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -40,7 +40,6 @@ import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.overseer.CollectionMutator;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.cloud.overseer.ZkStateWriter;
-import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.SolrException;
@@ -50,6 +49,8 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.SolrZooKeeper;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -58,6 +59,7 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
@@ -73,6 +75,7 @@ import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -129,6 +132,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.SP
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.util.Utils.makeMap;
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -880,51 +884,28 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     CountDownLatch latch = new CountDownLatch(1);
     latches.add(latch);
     // mn- from DistributedMap
-    final String sucessAsyncPathToWaitOn = "/overseer/collection-map-completed" + "/mn-" + requestId;
+    final String successPath = "/overseer/collection-map-completed" + "/mn-" + requestId;
     final String failAsyncPathToWaitOn = "/overseer/collection-map-failure" + "/mn-" + requestId;
     final String runningAsyncPathToWaitOn = "/overseer/collection-map-running" + "/mn-" + requestId;
 
     if (zkController.getOverseerRunningMap().contains(requestId)) {
+      WatchForResponseNode waitForResponse = new WatchForResponseNode(latch, zkStateReader.getZkClient(), successPath);
       try {
-
-        Watcher waitForAsyncId = event -> {
-          if (log.isDebugEnabled()) log.debug("waitForAsyncId {}", event);
-          if (Watcher.Event.EventType.None.equals(event.getType())) {
-            return;
-          }
-          if (event.getType().equals(Watcher.Event.EventType.NodeCreated)) {
-            if (log.isDebugEnabled()) log.debug("Async response zk node created");
-            latch.countDown();
-            return;
-          } else if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
-            if (log.isDebugEnabled()) log.debug("Async response zk node deleted");
-            latch.countDown();
-            return;
-          }
-        };
-
-        Stat rstats = zkStateReader.getZkClient().exists(sucessAsyncPathToWaitOn, waitForAsyncId);
-        if (log.isDebugEnabled()) log.debug("created watch for async response, stat={}", rstats);
-        if (rstats != null) {
+        Stat rstats1 = zkStateReader.getZkClient().exists(successPath, waitForResponse);
+        if (log.isDebugEnabled()) log.debug("created watch for async response, stat={}", rstats1);
+        if (rstats1 != null) {
           latch.countDown();
         }
 
-        rstats = zkStateReader.getZkClient().exists(failAsyncPathToWaitOn, waitForAsyncId);
-        if (log.isDebugEnabled()) log.debug("created watch for async response, stat={}", rstats);
-        if (rstats != null) {
+        Stat rstats2 = zkStateReader.getZkClient().exists(failAsyncPathToWaitOn, waitForResponse);
+        if (log.isDebugEnabled()) log.debug("created watch for async response, stat={}", rstats2);
+        if (rstats2 != null) {
           latch.countDown();
         }
 
-        if (overseer.isClosed()) {
-          throw new AlreadyClosedException();
-        }
-
-        if (log.isDebugEnabled()) log.debug("created watch for async response {}", requestId);
+        if (log.isDebugEnabled()) log.debug("created watch for response {}", requestId);
         boolean success = false;
         for (int i = 0; i < 5; i++) {
-          if (overseer.isClosed() || overseer.getCoreContainer().isShutDown()) {
-            break;
-          }
           success = latch.await(3, TimeUnit.SECONDS); // nocommit - still need a central timeout strat
           if (success) {
             if (log.isDebugEnabled()) log.debug("latch was triggered {}", requestId);
@@ -935,10 +916,12 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         }
 
         if (!success) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Timeout waiting to see async zk node " + sucessAsyncPathToWaitOn);
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Timeout waiting to see async zk node " + successPath);
         }
 
       } finally {
+        IOUtils.closeQuietly(waitForResponse);
+        latch.countDown();
         latches.remove(latch);
       }
 
@@ -975,7 +958,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       results.add("STATUS", status);
     }
 
-    String r = ((NamedList<String>)srsp.getValues().get("STATUS")).get("state").toLowerCase(Locale.ROOT);
+    String r = ((NamedList<String>) srsp.getValues().get("STATUS")).get("state").toLowerCase(Locale.ROOT);
     if (r.equals("running")) {
       if (log.isDebugEnabled()) log.debug("The task is still RUNNING, continuing to wait.");
       throw new SolrException(ErrorCode.BAD_REQUEST,
@@ -1208,4 +1191,51 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       shardAsyncIdByNode.put(nodeName, coreAdminAsyncId);
     }
   }
+
+  private static class WatchForResponseNode implements Watcher, Closeable {
+    private final CountDownLatch latch;
+    private final SolrZkClient zkClient;
+    private final String watchPath;
+    private boolean closed;
+
+    public WatchForResponseNode(CountDownLatch latch, SolrZkClient zkClient, String watchPath) {
+      this.zkClient = zkClient;
+      this.latch = latch;
+      this.watchPath = watchPath;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      if (log.isDebugEnabled()) log.debug("waitForAsyncId {}", event);
+      if (Event.EventType.None.equals(event.getType()) || closed) {
+        return;
+      }
+      if (event.getType().equals(Event.EventType.NodeCreated)) {
+        if (log.isDebugEnabled()) log.debug("Overseer request response zk node created");
+        latch.countDown();
+        return;
+      } else if (event.getType().equals(Event.EventType.NodeDeleted)) {
+        if (log.isDebugEnabled()) log.debug("Overseer request response zk node deleted");
+        latch.countDown();
+        return;
+      } else if (event.getType().equals(Event.EventType.NodeDataChanged)) {
+        if (log.isDebugEnabled()) log.debug("Overseer request response zk node data changed");
+        latch.countDown();
+        return;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.closed = true;
+      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
+      try {
+        zk.removeWatches(watchPath, this, WatcherType.Any, true);
+      } catch (KeeperException.NoWatcherException e) {
+
+      } catch (Exception e) {
+        log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+      }
+    }
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 2856347..91fcbe6 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -95,6 +95,7 @@ import org.apache.solr.schema.FieldType;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.ManagedIndexSchema;
 import org.apache.solr.schema.SimilarityFactory;
+import org.apache.solr.schema.ZkIndexSchemaReader;
 import org.apache.solr.search.QParserPlugin;
 import org.apache.solr.search.SolrFieldCacheBean;
 import org.apache.solr.search.SolrIndexSearcher;
@@ -239,6 +240,8 @@ public final class SolrCore implements SolrInfoBean, Closeable {
   private volatile Counter newSearcherOtherErrorsCounter;
   private final CoreContainer coreContainer;
 
+  private volatile ZkIndexSchemaReader zkIndexSchemaReader;
+
   private final Set<String> metricNames = ConcurrentHashMap.newKeySet(64);
   private final String metricTag = SolrMetricProducer.getUniqueMetricTag(this, null);
   private volatile SolrMetricsContext solrMetricsContext;
@@ -1281,6 +1284,22 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       StopWatch timeRegConfListener = new StopWatch(this + "-startCore-regConfListener");
       registerConfListener();
       timeRegConfListener.done();
+
+      if (coreContainer.isZooKeeperAware() && schema instanceof ManagedIndexSchema) {
+        try {
+          this.zkIndexSchemaReader = new ZkIndexSchemaReader(((ManagedIndexSchema) schema).getManagedIndexSchemaFactory(), this);
+        } catch (KeeperException.NoNodeException e) {
+          // no managed schema file yet
+        } catch (KeeperException e) {
+          String msg = "Exception creating ZkIndexSchemaReader";
+          log.error(msg, e);
+          throw new SolrException(ErrorCode.SERVER_ERROR, msg, e);
+        } catch (InterruptedException e) {
+          ParWork.propagateInterrupt(e);
+          throw new SolrException(ErrorCode.SERVER_ERROR, e);
+        }
+      }
+
     } catch(Exception e) {
 //      try {
 //        close();
@@ -1863,6 +1882,8 @@ public final class SolrCore implements SolrInfoBean, Closeable {
 
         searcherExecutor.shutdown();
 
+        closer.collect(zkIndexSchemaReader);
+
         closer.collect("closeSearcher", () -> {
           closeSearcher();
         });
@@ -3175,6 +3196,10 @@ public final class SolrCore implements SolrInfoBean, Closeable {
     return "responseWriters";
   }
 
+  public ZkIndexSchemaReader getZkIndexSchemaReader() {
+    return zkIndexSchemaReader;
+  }
+
   public interface RawWriter {
     default String getContentType() {
       return BinaryResponseParser.BINARY_CONTENT_TYPE;
diff --git a/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java b/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java
index df71dc9..3cb0390 100644
--- a/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java
@@ -169,8 +169,8 @@ public class SchemaHandler extends RequestHandlerBase implements SolrCoreAware,
             if (refreshIfBelowVersion != -1 && zkVersion < refreshIfBelowVersion) {
               log.info("REFRESHING SCHEMA (refreshIfBelowVersion={}, currentVersion={}) before returning version!"
                   , refreshIfBelowVersion, zkVersion);
-              ZkIndexSchemaReader zkIndexSchemaReader =  managed.getManagedIndexSchemaFactory().getZkIndexSchemaReader();
-              managed = (ManagedIndexSchema) zkIndexSchemaReader.updateSchema();
+              ZkIndexSchemaReader zkIndexSchemaReader =  req.getCore().getZkIndexSchemaReader();
+              managed = (ManagedIndexSchema) zkIndexSchemaReader.updateSchema(false);
               zkVersion = managed.getSchemaZkVersion();
             }
           }
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
index 9946cc5..0887047 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
@@ -70,8 +70,6 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
   private volatile String resourceName;
   private volatile ManagedIndexSchema schema;
 
-  private volatile ZkIndexSchemaReader zkIndexSchemaReader;
-
   private volatile String loadedResource;
   private volatile boolean shouldUpgrade = false;
 
@@ -427,22 +425,7 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
     this.core = core;
     this.collection = core.getCoreDescriptor().getCollectionName();
     this.cc = core.getCoreContainer();
-    if (this.zkIndexSchemaReader == null && loader instanceof ZkSolrResourceLoader) {
-      try {
-        this.zkIndexSchemaReader = new ZkIndexSchemaReader(this, core);
-        core.setLatestSchema(getSchema());
-      } catch (KeeperException.NoNodeException e) {
-        // no managed schema file yet
-      } catch (KeeperException e) {
-        String msg = "Error attempting to access " + ((ZkSolrResourceLoader)loader).getConfigSetZkPath() + "/" + managedSchemaResourceName;
-        log.error(msg, e);
-        throw new SolrException(ErrorCode.SERVER_ERROR, msg, e);
-      } catch (InterruptedException e) {
-        ParWork.propagateInterrupt(e);
-      }
-    } else {
-      this.zkIndexSchemaReader = null;
-    }
+    core.setLatestSchema(getSchema());
   }
 
   public ManagedIndexSchema getSchema() {
@@ -463,8 +446,4 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
   public SolrConfig getConfig() {
     return config;
   }
-
-  public ZkIndexSchemaReader getZkIndexSchemaReader() {
-    return zkIndexSchemaReader;
-  }
 }
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index 83245aa..652fffb 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -16,12 +16,11 @@
  */
 package org.apache.solr.schema;
 
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.core.CloseHook;
-import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -39,9 +38,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 /** Keeps a ManagedIndexSchema up-to-date when changes are made to the serialized managed schema in ZooKeeper */
-public class ZkIndexSchemaReader implements OnReconnect {
+public class ZkIndexSchemaReader implements OnReconnect, Closeable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final ManagedIndexSchemaFactory managedIndexSchemaFactory;
+  private final ZkController zkController;
   private volatile SolrZkClient zkClient;
   private final String managedSchemaPath;
   private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on
@@ -55,34 +55,12 @@ public class ZkIndexSchemaReader implements OnReconnect {
     this.managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
     this.uniqueCoreId = solrCore.getName()+":"+solrCore.getStartNanoTime();
     this.collection = solrCore.getCoreDescriptor().getCollectionName();
-
-    // register a CloseHook for the core this reader is linked to, so that we can de-register the listener
-    solrCore.addCloseHook(new CloseHook() {
-      @Override
-      public void preClose(SolrCore core) {
-        CoreContainer cc = core.getCoreContainer();
-        if (cc.isZooKeeperAware()) {
-          if (log.isDebugEnabled()) {
-            log.debug("Removing ZkIndexSchemaReader OnReconnect listener as core {} is shutting down.", core.getName());
-          }
-
-          cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this);
-        }
-      }
-
-      @Override
-      public void postClose(SolrCore core) {
-        IOUtils.closeQuietly(schemaWatcher);
-      //  schemaWatcher = null;
-     //   ZkIndexSchemaReader.this.managedIndexSchemaFactory = null;
-     //   zkClient = null;
-
-      }
-    });
-
-    updateSchema();
+    this.zkController = solrCore.getCoreContainer().getZkController();
 
     solrCore.getCoreContainer().getZkController().addOnReconnectListener(this);
+
+    schemaWatcher = new SchemaWatcher(this);
+    updateSchema(true);
   }
 
   public ReentrantLock getSchemaUpdateLock() {
@@ -93,16 +71,12 @@ public class ZkIndexSchemaReader implements OnReconnect {
     return managedIndexSchemaFactory.getSchema();
   }
 
-  /**
-   * Creates a schema watcher and returns it for controlling purposes.
-   *
-   */
-  public void createSchemaWatcher() {
-    if (log.isDebugEnabled()) log.debug("Creating ZooKeeper watch for the managed schema at {}", managedSchemaPath);
+  @Override
+  public void close() throws IOException {
+    zkController.removeOnReconnectListener(ZkIndexSchemaReader.this);
     IOUtils.closeQuietly(schemaWatcher);
-    schemaWatcher = new SchemaWatcher(this);
   }
-  
+
   /**
    * Watches for schema changes and triggers updates in the {@linkplain ZkIndexSchemaReader}.
    */
@@ -122,7 +96,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
       }
       log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event);
       try {
-        schemaReader.updateSchema();
+        schemaReader.updateSchema(true);
       } catch (Exception e) {
         log.error("", e);
       }
@@ -146,13 +120,13 @@ public class ZkIndexSchemaReader implements OnReconnect {
 //  }
 
   // package visibility for test purposes
-  public IndexSchema updateSchema() throws KeeperException, InterruptedException {
+  public IndexSchema updateSchema(boolean createWatch) throws KeeperException, InterruptedException {
     ManagedIndexSchema newSchema;
     ReentrantLock  lock = getSchemaUpdateLock();
     lock.lock();
     try {
       Stat stat = new Stat();
-      createSchemaWatcher();
+
       Stat exists = zkClient.exists(managedSchemaPath, schemaWatcher, true);
       if (exists == null) {
         log.info("{} does not exist yet, watching ...}", managedSchemaPath);
@@ -171,9 +145,9 @@ public class ZkIndexSchemaReader implements OnReconnect {
 
         return null;
       }
-
+      Watcher watcher = (createWatch ? schemaWatcher : null);
       long start = System.nanoTime();
-      byte[] data = zkClient.getData(managedSchemaPath, this.schemaWatcher, stat, true);
+      byte[] data = zkClient.getData(managedSchemaPath, watcher, stat, true);
 
       InputSource inputSource = new InputSource(new ByteArrayInputStream(data));
       String resourceName = managedIndexSchemaFactory.getManagedSchemaResourceName();
@@ -200,7 +174,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
   public void command() {
     try {
       // force update now as the schema may have changed while our zk session was expired
-      updateSchema();
+      updateSchema(false);
     } catch (Exception exc) {
       log.error("Failed to update managed-schema watcher after session expiration due to: {}", exc);
     }
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index b779bdaf8..afaa236 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -124,10 +124,6 @@ public class SolrCmdDistributor implements Closeable {
 
     // this can happen in certain situations such as close
     if (isRetry) {
-      if (rspCode == 403 || rspCode == 503) {
-        doRetry = true;
-      }
-
       // if it's a io exception exception, lets try again
       if (err.t instanceof SolrServerException) {
         if (((SolrServerException) err.t).getRootCause() instanceof IOException  && !(((SolrServerException) err.t).getRootCause() instanceof ClosedChannelException)) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 45b7891..a9179c4 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -233,7 +233,7 @@ public class UpdateShardHandler implements SolrInfoBean {
   }
 
   public void close() {
-    if (closeTracker != null) closeTracker.close();
+    assert closeTracker != null ? closeTracker.close() : true;
     if (updateOnlyClient != null) updateOnlyClient.disableCloseLock();
     if (recoveryOnlyClient != null) recoveryOnlyClient.disableCloseLock();
     if (searchOnlyClient != null) searchOnlyClient.disableCloseLock();
diff --git a/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
index 14d7ab2..a7aca6e 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
@@ -154,7 +154,7 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso
   public UpdateRequestProcessor getInstance(SolrQueryRequest req, 
                                             SolrQueryResponse rsp, 
                                             UpdateRequestProcessor next) {
-    return new AddSchemaFieldsUpdateProcessor(next, typeMappings, inclusions, exclusions, solrResourceLoader, defaultFieldType);
+    return new AddSchemaFieldsUpdateProcessor(next, typeMappings, inclusions, exclusions, solrResourceLoader, defaultFieldType, req);
   }
 
   @Override
@@ -375,15 +375,17 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso
     private final SolrResourceLoader solrResourceLoader;
     private final List<TypeMapping> typeMappings;
     private final String defaultFieldType;
+    private final SolrQueryRequest req;
 
     public AddSchemaFieldsUpdateProcessor(UpdateRequestProcessor next, List<TypeMapping> typeMappings, SelectorParams inclusions, Collection<SelectorParams> exclusions,
-        SolrResourceLoader solrResourceLoader, String defaultFieldType) {
+        SolrResourceLoader solrResourceLoader, String defaultFieldType, SolrQueryRequest req) {
       super(next);
       this.inclusions = inclusions;
       this.typeMappings = typeMappings;
       this.exclusions = exclusions;
       this.solrResourceLoader = solrResourceLoader;
       this.defaultFieldType = defaultFieldType;
+      this.req = req;
     }
     
     @Override
@@ -509,7 +511,7 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso
           cmd.getReq().updateSchemaToLatest();
         } catch (ManagedIndexSchema.SchemaChangedInZkException e) {
           try {
-            ((ManagedIndexSchema) cmd.getReq().getSchema()).getManagedIndexSchemaFactory().getZkIndexSchemaReader().updateSchema();
+            req.getCore().getZkIndexSchemaReader().updateSchema(false);
             cmd.getReq().updateSchemaToLatest();
 
             if (log.isDebugEnabled()) log.debug("Schema changed while processing request ... current latest version {} try={}", ((ManagedIndexSchema) cmd.getReq().getSchema()).getSchemaZkVersion(), cnt);
diff --git a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
index d2d7fcc..b88be80 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
@@ -262,7 +262,7 @@ public class SearchHandlerTest extends SolrTestCaseJ4
       } catch (Exception e) {
         assertTrue("Unrecognized exception message: " + e, 
             e.getMessage().contains("no servers hosting shard:") 
-                || e.getMessage().contains("ZooKeeper is not connected"));
+                || e.getMessage().contains("SolrZkClient is not currently connected,,"));
       }
     }
     finally {
diff --git a/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java b/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
deleted file mode 100644
index 321f41b..0000000
--- a/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.schema;
-
-import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.schema.ZkIndexSchemaReader.SchemaWatcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class SchemaWatcherTest {
-
-  private ZkIndexSchemaReader mockSchemaReader;
-  private SchemaWatcher schemaWatcher;
-
-  @Before
-  public void setUp() throws Exception {
-    SolrTestCaseJ4.assumeWorkingMockito();
-    
-    mockSchemaReader = mock(ZkIndexSchemaReader.class);
-    schemaWatcher = new SchemaWatcher(mockSchemaReader);
-  }
-
-  @Test
-  public void testProcess() throws Exception {
-    schemaWatcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, "/test"));
-    verify(mockSchemaReader).updateSchema();
-  }
-
-}
diff --git a/solr/core/src/test/org/apache/solr/util/AuthToolTest.java b/solr/core/src/test/org/apache/solr/util/AuthToolTest.java
index 9bf4056..557c9c5 100644
--- a/solr/core/src/test/org/apache/solr/util/AuthToolTest.java
+++ b/solr/core/src/test/org/apache/solr/util/AuthToolTest.java
@@ -26,6 +26,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.solr.util.SolrCLI.findTool;
@@ -34,6 +35,7 @@ import static org.apache.solr.util.SolrCLI.parseCmdLine;
 /**
  * Unit test for SolrCLI's AuthTool
  */
+@Ignore // MRM-TEST TODO:
 public class AuthToolTest extends SolrCloudTestCase {
   private Path dir;
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index caa7a77..edad06b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -55,7 +55,7 @@ public class ParWorkExecutor extends ThreadPoolExecutor {
     if (isShutdown()) {
       return;
     }
-    if (closeTracker != null) closeTracker.close();
+    assert closeTracker != null ? closeTracker.close() : true;
     setKeepAliveTime(1, TimeUnit.NANOSECONDS);
     for (int i = 0; i < Math.max(0, getPoolSize() - getActiveCount() + 1); i++) {
       try {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 236f85b..24006d4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -88,8 +88,12 @@ public class ConnectionManager implements Watcher, Closeable {
   }
 
   public ZooKeeper getKeeper() {
-      SolrZooKeeper rKeeper = keeper;
-      return rKeeper;
+    if (isClosed) {
+      throw new AlreadyClosedException(this + " SolrZkClient is not currently connected state=" + keeper.getState());
+    }
+
+    SolrZooKeeper rKeeper = keeper;
+    return rKeeper;
   }
 
   public void setZkCredentialsToAddAutomatically(ZkCredentialsProvider zkCredentialsToAddAutomatically) {
@@ -222,10 +226,6 @@ public class ConnectionManager implements Watcher, Closeable {
         }
       }
 
-      //    if (isClosed()) {
-      //      log.debug("Client->ZooKeeper status change trigger but we are already closed");
-      //      return;
-      //    }
 
       KeeperState state = event.getState();
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index c761e3d..e94bcae 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -550,6 +550,7 @@ public class SolrZkClient implements Closeable {
       Watcher watcher, boolean failOnExists, boolean retryOnConnLoss, int skipPathParts) throws KeeperException, InterruptedException {
     ZooKeeper keeper = connManager.getKeeper();
     if (log.isDebugEnabled()) log.debug("makePath: {}", path);
+
     boolean retry = true;
     if (path.startsWith("/")) {
       path = path.substring(1);
@@ -1075,7 +1076,7 @@ public class SolrZkClient implements Closeable {
     isClosed = true;
     connManager.close();
 
-    if (closeTracker != null) closeTracker.close();
+    assert closeTracker != null ? closeTracker.close() : true;
     assert ObjectReleaseTracker.release(this);
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
index 108120c..7e548cc 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -103,7 +103,7 @@ public class SolrZooKeeper extends ZooKeeperAdmin {
 
   @Override
   public void close() {
-    if (closeTracker != null) closeTracker.close();
+    assert closeTracker != null ? closeTracker.close() : true;
     try {
       try {
         RequestHeader h = new RequestHeader();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
index a0ab7af..b2fd548 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
@@ -66,6 +66,9 @@ public class ZkCmdExecutor {
   @SuppressWarnings("unchecked")
   public static <T> T retryOperation(ZkCmdExecutor zkCmdExecutor, ZkOperation operation)
       throws KeeperException, InterruptedException {
+    if (isClosed.isClosed()) {
+      throw new AlreadyClosedException(this + " SolrZkClient is already closed");
+    }
     KeeperException exception = null;
     int tryCnt = 0;
     while (tryCnt < zkCmdExecutor.retryCount) {
@@ -79,7 +82,7 @@ public class ZkCmdExecutor {
         if (!zkCmdExecutor.solrZkClient.getSolrZooKeeper().getState().isAlive()) {
           throw e;
         }
-        zkCmdExecutor.retryDelay(tryCnt);
+        retryDelay(tryCnt);
       }
       tryCnt++;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 4ba3a57..45eaf98 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -212,7 +212,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
   private final ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(32, 0.75f, 3);
 
-  private final Map<String,StateWatcher> stateWatchersMap = new ConcurrentHashMap<>(32, 0.75f, 3);
+  private final Map<String,CollectionStateWatcher> stateWatchersMap = new ConcurrentHashMap<>(32, 0.75f, 3);
 
   // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
   private final ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
@@ -229,6 +229,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
   private volatile Future<?> collectionPropsCacheCleaner; // only kept to identify if the cleaner has already been started.
   private volatile String node = null;
+  private volatile LiveNodeWatcher liveNodesWatcher;
+  private volatile CollectionsChildWatcher collectionsChildWatcher;
 
   public static interface CollectionRemoved {
     void removed(String collection);
@@ -348,6 +350,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     try {
       zkClient.start();
     } catch (RuntimeException re) {
+      log.error("Exception starting zkClient", re);
       zkClient.close(); // stuff has been opened inside the zkClient
       throw re;
     }
@@ -514,8 +517,12 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       // on reconnect of SolrZkClient force refresh and re-add watches.
       loadClusterProperties();
-      refreshLiveNodes(new LiveNodeWatcher());
-      refreshCollectionList(new CollectionsChildWatcher());
+
+      IOUtils.closeQuietly(this.liveNodesWatcher);
+      this.liveNodesWatcher = new LiveNodeWatcher();
+      refreshLiveNodes(this.liveNodesWatcher);
+      this.collectionsChildWatcher = new CollectionsChildWatcher();
+      refreshCollectionList(collectionsChildWatcher);
 
       refreshAliases(aliasesManager);
 
@@ -646,33 +653,34 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   /**
    * Search for any lazy-loadable collections.
    */
-  private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
-    synchronized (refreshCollectionListLock) {
-      List<String> children = null;
-      try {
-        children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
-      } catch (KeeperException.NoNodeException e) {
-        log.warn("Error fetching collection names: [{}]", e.getMessage());
-        // fall through
-      }
-      if (children == null || children.isEmpty()) {
-        lazyCollectionStates.clear();
-        return;
-      }
+  private void refreshCollectionList(CollectionsChildWatcher watcher) throws KeeperException, InterruptedException {
+    if (watcher != null) {
+      IOUtils.closeQuietly(watcher);
+    }
+    List<String> children = null;
+    try {
+      children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
+    } catch (KeeperException.NoNodeException e) {
+      log.warn("Error fetching collection names: [{}]", e.getMessage());
+      // fall through
+    }
+    if (children == null || children.isEmpty()) {
+      lazyCollectionStates.clear();
+      return;
+    }
 
-      // Don't lock getUpdateLock() here, we don't need it and it would cause deadlock.
-      // Don't mess with watchedCollections, they should self-manage.
+    // Don't lock getUpdateLock() here, we don't need it and it would cause deadlock.
+    // Don't mess with watchedCollections, they should self-manage.
 
-      // First, drop any children that disappeared.
-      this.lazyCollectionStates.keySet().retainAll(children);
-      for (String coll : children) {
-        // We will create an eager collection for any interesting collections, so don't add to lazy.
-        if (!collectionWatches.containsKey(coll)) {
-          // Double check contains just to avoid allocating an object.
-          LazyCollectionRef existing = lazyCollectionStates.get(coll);
-          if (existing == null) {
-            lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
-          }
+    // First, drop any children that disappeared.
+    this.lazyCollectionStates.keySet().retainAll(children);
+    for (String coll : children) {
+      // We will create an eager collection for any interesting collections, so don't add to lazy.
+      if (!collectionWatches.containsKey(coll)) {
+        // Double check contains just to avoid allocating an object.
+        LazyCollectionRef existing = lazyCollectionStates.get(coll);
+        if (existing == null) {
+          lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
         }
       }
     }
@@ -787,7 +795,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   /**
    * Refresh live_nodes.
    */
-  private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
+  private void refreshLiveNodes(LiveNodeWatcher watcher) throws KeeperException, InterruptedException {
+    if (watcher != null) {
+      IOUtils.closeQuietly(watcher);
+    }
 
     SortedSet<String> oldLiveNodes;
     SortedSet<String> newLiveNodes = null;
@@ -875,37 +886,29 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
   public void close() {
     if (log.isDebugEnabled()) log.debug("Closing ZkStateReader");
-    if (closeTracker != null) closeTracker.close();
-    this.closed = true;
+    assert closeTracker != null ? closeTracker.close() : true;
 
-    synchronized (this) {
-      if (collectionPropsCacheCleaner != null) {
-        collectionPropsCacheCleaner.cancel(true);
+    try {
+      IOUtils.closeQuietly(clusterPropertiesWatcher);
+      Future<?> cpc = collectionPropsCacheCleaner;
+      if (cpc != null) {
+        cpc.cancel(true);
       }
-    }
-
-//;
+      stateWatchersMap.forEach((s, stateWatcher) -> IOUtils.closeQuietly(stateWatcher));
+      stateWatchersMap.clear();
 
-    stateWatchersMap.forEach((s, stateWatcher) -> IOUtils.closeQuietly(stateWatcher));
-    stateWatchersMap.clear();
-
-    try {
+      IOUtils.closeQuietly(this.liveNodesWatcher);
+      IOUtils.closeQuietly(this.collectionsChildWatcher);
       if (closeClient) {
         IOUtils.closeQuietly(zkClient);
       }
-      try {
-        if (collectionPropsCacheCleaner != null) {
-          collectionPropsCacheCleaner.cancel(false);
-        }
-      } catch (NullPointerException e) {
-        // okay
-      }
-//      if (notifications != null) {
-//        notifications.shutdownNow();
-//      }
 
-//      waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
-//      waitLatches.clear();
+      //      if (notifications != null) {
+      //        notifications.shutdownNow();
+      //      }
+
+      //      waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
+      //      waitLatches.clear();
 
     } finally {
       assert ObjectReleaseTracker.release(this);
@@ -913,11 +916,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
   }
 
-  @Override
-  public boolean isClosed() {
-    return closed;
-  }
-
   public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException, TimeoutException {
     Replica replica = getLeaderRetry(collection, shard, timeout);
     return replica.getCoreUrl();
@@ -1177,18 +1175,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     return Collections.unmodifiableMap(clusterProperties);
   }
 
-  private final Watcher clusterPropertiesWatcher = event -> {
-    // session events are not change events, and do not remove the watcher
-    if (Watcher.Event.EventType.None.equals(event.getType())) {
-      return;
-    }
-    loadClusterProperties();
-  };
+  private final ClusterPropsWatcher clusterPropertiesWatcher = new ClusterPropsWatcher(ZkStateReader.CLUSTER_PROPS);
 
   @SuppressWarnings("unchecked")
   private void loadClusterProperties() {
     try {
         try {
+          IOUtils.closeQuietly(clusterPropertiesWatcher);
           byte[] data = zkClient.getData(ZkStateReader.CLUSTER_PROPS, clusterPropertiesWatcher, new Stat(), true);
           this.clusterProperties = ClusterProperties.convertCollectionDefaultsToNestedFormat((Map<String, Object>) Utils.fromJSON(data));
           log.debug("Loaded cluster properties: {}", this.clusterProperties);
@@ -1201,16 +1194,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
           if (log.isDebugEnabled()) {
             log.debug("Loaded empty cluster properties");
           }
-          // set an exists watch, and if the node has been created since the last call,
-          // read the data again
-          if (zkClient.exists(ZkStateReader.CLUSTER_PROPS, clusterPropertiesWatcher) == null)
-            return;
         }
     } catch (KeeperException e) {
       log.error("Error reading cluster properties from zookeeper", SolrZkClient.checkInterrupted(e));
     } catch (InterruptedException e) {
       log.info("interrupted");
     }
+
   }
 
   /**
@@ -1242,38 +1232,36 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * @return a map representing the key/value properties for the collection.
    */
   public Map<String, String> getCollectionProperties(final String collection, long cacheForMillis) {
-
-      Watcher watcher = null;
-      if (cacheForMillis > 0) {
-        watcher = collectionPropsWatchers.compute(collection,
-            (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
-      }
-      VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
-      boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
-      long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
-      Map<String, String> properties;
-      if (haveUnexpiredProps) {
-        properties = vprops.props;
-        vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
-      } else {
-        try {
-          VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
-          properties = vcp.props;
-          if (cacheForMillis > 0) {
-            vcp.cacheUntilNs = untilNs;
-            watchedCollectionProps.put(collection, vcp);
-          } else {
-            // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
-            // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
-            if (!collectionPropsObservers.containsKey(collection)) {
-              watchedCollectionProps.remove(collection);
-            }
+    PropsWatcher watcher = null;
+    if (cacheForMillis > 0) {
+      watcher = collectionPropsWatchers.compute(collection, (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
+    }
+    VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
+    boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
+    long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
+    Map<String,String> properties;
+    if (haveUnexpiredProps) {
+      properties = vprops.props;
+      vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
+    } else {
+      try {
+        VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
+        properties = vcp.props;
+        if (cacheForMillis > 0) {
+          vcp.cacheUntilNs = untilNs;
+          watchedCollectionProps.put(collection, vcp);
+        } else {
+          // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
+          // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
+          if (!collectionPropsObservers.containsKey(collection)) {
+            watchedCollectionProps.remove(collection);
           }
-        } catch (Exception e) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
         }
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
       }
-      return properties;
+    }
+    return properties;
   }
 
   private static class VersionedCollectionProps {
@@ -1292,7 +1280,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   }
 
   @SuppressWarnings("unchecked")
-  private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
+  private VersionedCollectionProps fetchCollectionProperties(String collection, PropsWatcher watcher) throws KeeperException, InterruptedException {
     final String znodePath = getCollectionPropsPath(collection);
     // lazy init cache cleaner once we know someone is using collection properties.
     if (collectionPropsCacheCleaner == null) {
@@ -1302,25 +1290,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         }
       }
     }
-    while (true) {
-      try {
-        Stat stat = new Stat();
-        byte[] data = zkClient.getData(znodePath, watcher, stat, true);
-        return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
-      } catch (ClassCastException e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
-      } catch (KeeperException.NoNodeException e) {
-        if (watcher != null) {
-          // Leave an exists watch in place in case a collectionprops.json is created later.
-          Stat exists = zkClient.exists(znodePath, watcher, true);
-          if (exists != null) {
-            // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
-            // Loop and try again.
-            continue;
-          }
-        }
-        return new VersionedCollectionProps(-1, EMPTY_MAP);
-      }
+
+    try {
+      IOUtils.closeQuietly(watcher);
+      Stat stat = new Stat();
+      byte[] data = zkClient.getData(znodePath, watcher, stat, true);
+      return new VersionedCollectionProps(stat.getVersion(), (Map<String,String>) Utils.fromJSON(data));
+    } catch (ClassCastException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
+    } catch (KeeperException.NoNodeException e) {
+      return new VersionedCollectionProps(-1, EMPTY_MAP);
     }
   }
 
@@ -1366,11 +1345,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   /**
    * Watches a single collection's format2 state.json.
    */
-  class StateWatcher implements Watcher, Closeable {
+  class CollectionStateWatcher implements Watcher, Closeable {
     private final String coll;
-    private volatile Watcher watcher;
+    private volatile StateUpdateWatcher stateUpdateWatcher;
 
-    StateWatcher(String coll) {
+    CollectionStateWatcher(String coll) {
       this.coll = coll;
     }
 
@@ -1389,7 +1368,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       if (!collectionWatches.containsKey(coll)) {
         // This collection is no longer interesting, stop watching.
-        log.debug("Uninteresting collection {}", coll);
+        if (log.isDebugEnabled()) log.debug("Uninteresting collection {}", coll);
         return;
       }
 
@@ -1398,8 +1377,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         log.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])", event, coll, liveNodes.size());
       }
 
-      refreshAndWatch();
-
+      refreshAndWatch(true);
     }
 
     /**
@@ -1407,10 +1385,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
      * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
      * with the results of the refresh.
      */
-    public void refreshAndWatch() {
+    public void refreshAndWatch(boolean createWatcher) {
       try {
 
-        DocCollection newState = fetchCollectionState(coll, this);
+        Watcher watcher = (createWatcher ? this : null);
+        DocCollection newState = fetchCollectionState(coll, watcher);
 
         updateWatchedCollection(coll, newState);
 
@@ -1425,31 +1404,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
     }
 
-    public void watchStateUpdates() {
+    public void watchStateUpdates(boolean createWatcher) {
       if (log.isDebugEnabled()) log.debug("watch for additional state updates {}", coll);
-      String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
-
-      watcher = new Watcher() {
-        @Override
-        public void process(WatchedEvent event) {
-          if (isClosed()) {
-            return;
-          }
-          if (log.isDebugEnabled()) log.debug("_statupdates event {}", event);
-
-          try {
-
-            //            if (event.getType() == EventType.NodeDataChanged ||
-            //                event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
-            processStateUpdates(stateUpdatesPath);
-            //            }
-
-          } catch (Exception e) {
-            log.error("Unwatched collection: [{}]", coll, e);
-          }
-        }
 
-      };
+      String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
+      if (createWatcher) {
+        IOUtils.closeQuietly(stateUpdateWatcher);
+        stateUpdateWatcher = new StateUpdateWatcher(stateUpdatesPath);
+      }
       try {
         processStateUpdates(stateUpdatesPath);
       } catch (Exception e) {
@@ -1461,11 +1423,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
       byte[] data = null;
 
-      Stat exists = zkClient.exists(stateUpdatesPath, watcher, true);
-
-      if (exists != null) {
-        data = getZkClient().getData(stateUpdatesPath, null, null, true);
-      }
+      IOUtils.closeQuietly(stateUpdateWatcher);
+      data = getZkClient().getData(stateUpdatesPath, stateUpdateWatcher, null, true);
 
       if (data == null) {
         return;
@@ -1610,38 +1569,71 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     @Override
     public void close() throws IOException {
-      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
-      if (zk != null && zkClient.isAlive()) {
-        try (ParWork work = new ParWork(this, false, false)) {
-          work.collect("", () -> {
+      try {
+        SolrZooKeeper zk = zkClient.getSolrZooKeeper();
+        if (zk != null) {
+          if (stateUpdateWatcher != null) {
             try {
-              zk.removeWatches(getCollectionSCNPath(coll), this, WatcherType.Any, true);
+              zk.removeWatches(getCollectionStateUpdatesPath(coll), stateUpdateWatcher, WatcherType.Any, true);
             } catch (KeeperException.NoWatcherException e) {
 
             } catch (Exception e) {
-              log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+              if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
             }
-          });
-          if (watcher != null) {
-            work.collect("", () -> {
-              try {
-                zk.removeWatches(getCollectionStateUpdatesPath(coll), watcher, WatcherType.Any, true);
-              } catch (KeeperException.NoWatcherException e) {
+          }
+        }
+      } finally {
+        IOUtils.closeQuietly(stateUpdateWatcher);
+      }
+    }
 
-              } catch (Exception e) {
-                log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
-              }
-            });
+    private class StateUpdateWatcher implements Watcher, Closeable {
+      private final String stateUpdatesPath;
+
+      public StateUpdateWatcher(String stateUpdatesPath) {
+        this.stateUpdatesPath = stateUpdatesPath;
+      }
+
+      @Override
+      public void close() throws IOException {
+        SolrZooKeeper zk = zkClient.getSolrZooKeeper();
+        if (zk != null) {
+          try {
+            zk.removeWatches(getCollectionSCNPath(coll), this, WatcherType.Any, true);
+          } catch (KeeperException.NoWatcherException e) {
+
+          } catch (Exception e) {
+            if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
           }
         }
       }
+
+      @Override
+      public void process(WatchedEvent event) {
+        if (isClosed()) {
+          return;
+        }
+        if (log.isDebugEnabled()) log.debug("_statupdates event {}", event);
+
+        try {
+
+          //            if (event.getType() == EventType.NodeDataChanged ||
+          //                event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated) {
+          processStateUpdates(stateUpdatesPath);
+          //            }
+
+        } catch (Exception e) {
+          log.error("Unwatched collection: [{}]", coll, e);
+        }
+      }
+
     }
   }
 
   /**
    * Watches collection properties
    */
-  class PropsWatcher implements Watcher {
+  class PropsWatcher implements Watcher, Closeable {
     private final String coll;
     private long watchUntilNs;
 
@@ -1682,38 +1674,54 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       refreshAndWatch(true);
     }
 
+    @Override
+    public void close() throws IOException {
+      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
+      String znodePath = getCollectionPropsPath(coll);
+      if (zk != null) {
+
+        try {
+          zk.removeWatches(znodePath, this, WatcherType.Any, true);
+        } catch (KeeperException.NoWatcherException e) {
+
+        } catch (Exception e) {
+          if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+        }
+      }
+    }
+
     /**
      * Refresh collection properties from ZK and leave a watch for future changes. Updates the properties in
      * watchedCollectionProps with the results of the refresh. Optionally notifies watchers
      */
     void refreshAndWatch(boolean notifyWatchers) {
       try {
-        synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
-          VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
-          Map<String, String> properties = vcp.props;
-          VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
-          if (existingVcp == null ||                   // never called before, record what we found
-              vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
-              vcp.zkVersion == -1) {                   // node was deleted start over
-            watchedCollectionProps.put(coll, vcp);
-            if (notifyWatchers) {
-              notifyPropsWatchers(coll, properties);
-            }
-            if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
 
-              // We should not be caching a collection that has been deleted.
-              watchedCollectionProps.remove(coll);
+        VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
+        Map<String,String> properties = vcp.props;
+        VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
+        if (existingVcp == null ||                   // never called before, record what we found
+            vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
+            vcp.zkVersion == -1) {                   // node was deleted start over
+          watchedCollectionProps.put(coll, vcp);
+          if (notifyWatchers) {
+            notifyPropsWatchers(coll, properties);
+          }
+          if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
 
-              // core ref counting not relevant here, don't need canRemove(), we just sent
-              // a notification of an empty set of properties, no reason to watch what doesn't exist.
-              collectionPropsObservers.remove(coll);
+            // We should not be caching a collection that has been deleted.
+            watchedCollectionProps.remove(coll);
 
-              // This is the one time we know it's safe to throw this out. We just failed to set the watch
-              // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
-              collectionPropsWatchers.remove(coll);
-            }
+            // core ref counting not relevant here, don't need canRemove(), we just sent
+            // a notification of an empty set of properties, no reason to watch what doesn't exist.
+            collectionPropsObservers.remove(coll);
+
+            // This is the one time we know it's safe to throw this out. We just failed to set the watch
+            // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
+            collectionPropsWatchers.remove(coll);
           }
         }
+
       } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
         log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
       } catch (KeeperException e) {
@@ -1729,7 +1737,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   /**
    * Watches /collections children .
    */
-  class CollectionsChildWatcher implements Watcher {
+  class CollectionsChildWatcher implements Watcher, Closeable {
 
     @Override
     public void process(WatchedEvent event) {
@@ -1745,7 +1753,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
       log.debug("A collections change: [{}], has occurred - updating...", event);
       try {
-        refreshAndWatch();
+        refreshAndWatch(this);
       } catch (Exception e) {
         log.error("An error has occurred", e);
         return;
@@ -1754,9 +1762,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       constructState(Collections.emptySet(), "collection child watcher");
     }
 
-    public void refreshAndWatch() {
+    public void refreshAndWatch(CollectionsChildWatcher watcher) {
       try {
-        refreshCollectionList(this);
+        refreshCollectionList(watcher);
       } catch (KeeperException e) {
         log.error("A ZK error has occurred", e);
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
@@ -1766,12 +1774,27 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         log.warn("Interrupted", e);
       }
     }
+
+    @Override
+    public void close() throws IOException {
+      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
+      if (zk != null) {
+
+        try {
+          zk.removeWatches(COLLECTIONS_ZKNODE, this, WatcherType.Any, true);
+        } catch (KeeperException.NoWatcherException e) {
+
+        } catch (Exception e) {
+          if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+        }
+      }
+    }
   }
 
   /**
    * Watches the live_nodes and syncs changes.
    */
-  class LiveNodeWatcher implements Watcher {
+  class LiveNodeWatcher implements Watcher, Closeable {
 
     @Override
     public void process(WatchedEvent event) {
@@ -1805,6 +1828,22 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         log.warn("Interrupted", e);
       }
     }
+
+
+    @Override
+    public void close() throws IOException {
+      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
+      if (zk != null) {
+
+        try {
+          zk.removeWatches(ZkStateReader.LIVE_NODES_ZKNODE, this, WatcherType.Any, true);
+        } catch (KeeperException.NoWatcherException e) {
+
+        } catch (Exception e) {
+          if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+        }
+      }
+    }
   }
 
   public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
@@ -1903,10 +1942,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       return v;
     });
     if (reconstructState.get()) {
-      StateWatcher sw = new StateWatcher(collection);
+      CollectionStateWatcher sw = new CollectionStateWatcher(collection);
       stateWatchersMap.put(collection, sw);
-      sw.refreshAndWatch();
-      sw.watchStateUpdates();
+      sw.refreshAndWatch(true);
+      sw.watchStateUpdates(true);
     }
 
   }
@@ -1920,7 +1959,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * <p>
    * Not a public API.  This method should only be called from ZkController.
    * <p>
-   * If no cores are registered for a collection, and there are no {@link CollectionStateWatcher}s
+   * If no cores are registered for a collection, and there are no {@link org.apache.solr.common.cloud.CollectionStateWatcher}s
    * for that collection either, the collection watch will be removed.
    *
    * @param collection the collection that the core belongs to
@@ -1970,7 +2009,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * @see #registerDocCollectionWatcher
    * @see #registerLiveNodesListener
    */
-  public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
+  public void registerCollectionStateWatcher(String collection, org.apache.solr.common.cloud.CollectionStateWatcher stateWatcher) {
     final DocCollectionAndLiveNodesWatcherWrapper wrapper
         = new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher);
 
@@ -2010,10 +2049,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     });
 
     if (watchSet.get()) {
-      StateWatcher sw = new StateWatcher(collection);
-      stateWatchersMap.put(collection, sw);
-      sw.refreshAndWatch();
-      sw.watchStateUpdates();
+      CollectionStateWatcher sw = new CollectionStateWatcher(collection);
+      sw.refreshAndWatch(true);
+      sw.watchStateUpdates(true);
     }
 
     DocCollection state = clusterState.getCollectionOrNull(collection);
@@ -2032,7 +2070,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * </p>
    *
    * <p>
-   * This implementation utilizes {@link CollectionStateWatcher} internally.
+   * This implementation utilizes {@link org.apache.solr.common.cloud.CollectionStateWatcher} internally.
    * Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate}
    * instead
    * </p>
@@ -2055,7 +2093,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     final CountDownLatch latch = new CountDownLatch(1);
     waitLatches.add(latch);
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
-    CollectionStateWatcher watcher = (n, c) -> {
+    org.apache.solr.common.cloud.CollectionStateWatcher watcher = (n, c) -> {
       // if (isClosed()) return true;
       docCollection.set(c);
       boolean matches = predicate.matches(this.liveNodes, c);
@@ -2169,7 +2207,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * @param watcher    the watcher
    * @see #registerCollectionStateWatcher
    */
-  public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
+  public void removeCollectionStateWatcher(String collection, org.apache.solr.common.cloud.CollectionStateWatcher watcher) {
     final DocCollectionAndLiveNodesWatcherWrapper wrapper
         = new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
 
@@ -2204,7 +2242,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         log.info("no longer watch collection {}", collection);
         watchedCollectionStates.remove(collection);
         lazyCollectionStates.put(collection, new LazyCollectionRef(collection));
-        StateWatcher stateWatcher = stateWatchersMap.remove(collection);
+        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
         if (stateWatcher != null) {
           IOUtils.closeQuietly(stateWatcher);
         }
@@ -2354,13 +2392,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         MDCLoggingContext.setNode(node);
       }
       List<DocCollectionWatcher> watchers = new ArrayList<>();
-      synchronized (collectionWatches) {
-        collectionWatches.compute(collection, (k, v) -> {
-          if (v == null) return null;
-          watchers.addAll(v.stateWatchers);
-          return v;
-        });
-      }
+
+      collectionWatches.compute(collection, (k, v) -> {
+        if (v == null) return null;
+        watchers.addAll(v.stateWatchers);
+        return v;
+      });
+
       for (DocCollectionWatcher watcher : watchers) {
         if (log.isDebugEnabled()) log.debug("Notify DocCollectionWatcher {} {}", watcher, collectionState);
         try {
@@ -2611,11 +2649,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
   /**
    * Helper class that acts as both a {@link DocCollectionWatcher} and a {@link LiveNodesListener}
-   * while wraping and delegating to a {@link CollectionStateWatcher}
+   * while wraping and delegating to a {@link org.apache.solr.common.cloud.CollectionStateWatcher}
    */
   private final class DocCollectionAndLiveNodesWatcherWrapper implements DocCollectionWatcher, LiveNodesListener {
     private final String collectionName;
-    private final CollectionStateWatcher delegate;
+    private final org.apache.solr.common.cloud.CollectionStateWatcher delegate;
 
     public int hashCode() {
       return collectionName.hashCode() * delegate.hashCode();
@@ -2632,7 +2670,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     }
 
     public DocCollectionAndLiveNodesWatcherWrapper(final String collectionName,
-                                                   final CollectionStateWatcher delegate) {
+                                                   final org.apache.solr.common.cloud.CollectionStateWatcher delegate) {
       this.collectionName = collectionName;
       this.delegate = delegate;
     }
@@ -2744,4 +2782,34 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
     }
   }
+
+  private class ClusterPropsWatcher implements Watcher, Closeable {
+
+    private final String path;
+
+    ClusterPropsWatcher(String path) {
+      this.path = path;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      // session events are not change events, and do not remove the watcher
+      if (EventType.None.equals(event.getType())) {
+        return;
+      }
+      loadClusterProperties();
+    }
+
+    @Override
+    public void close() throws IOException {
+      SolrZooKeeper zk = zkClient.getSolrZooKeeper();
+      try {
+        zk.removeWatches(path, this, WatcherType.Any, true);
+      } catch (KeeperException.NoWatcherException e) {
+
+      } catch (Exception e) {
+        log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+      }
+    }
+  }
 }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 3b19b56..421faab 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -379,7 +379,7 @@ public class ZkTestServer implements Closeable {
 
   public synchronized void shutdown() throws IOException, InterruptedException {
     log.info("Shutting down ZkTestServer.");
-    if (closeTracker != null) closeTracker.close();
+    assert closeTracker != null ? closeTracker.close() : true;
     try {
       if (chRootClient != null && chRootClient.isConnected()) {
         chRootClient.printLayout();