You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/05 17:03:28 UTC

[lucene-solr] 08/11: @482 Everything is whack. I forget everything means everything. God, I can't believe I have been this far and further before and then just wandered away from it and forgot I did anything more than play with some good resource usage and http2. I must have curated for so long. Even refreshing on it a few times since and I still forget how much coverage the whackness has.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 9c1bdc66e2a28403bb38f3615ad64c6d8fcde9eb
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Aug 1 11:41:53 2020 -0500

    @482 Everything is whack. I forget everything means everything. God, I can't believe I have been this far and further before and then just wandered away from it and forgot I did anything more than play with some good resource usage and http2. I must have curated for so long. Even refreshing on it a few times since and I still forget how much coverage the whackness has.
---
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  24 +-
 .../cloud/TestWaitForStateWithJettyShutdowns.java  |   6 +-
 .../org/apache/solr/common/ParWorkExecService.java |  18 +-
 .../apache/solr/common/cloud/ZkStateReader.java    | 327 +++++++++------------
 4 files changed, 163 insertions(+), 212 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d335428..8cce948 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -34,6 +34,7 @@ import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -184,14 +185,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
   }
 
   /** Builds a new HttpSolrClient for use in recovery.  Caller must close */
-  private final HttpSolrClient buildRecoverySolrClient(final String leaderUrl) {
+  private final Http2SolrClient buildRecoverySolrClient(final String leaderUrl) {
     // workaround for SOLR-13605: get the configured timeouts & set them directly
     // (even though getRecoveryOnlyHttpClient() already has them set)
     final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig();
-    return (new HttpSolrClient.Builder(leaderUrl)
-            .withConnectionTimeout(3)
-            .withSocketTimeout(5)
-            .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
+    return (new Http2SolrClient.Builder(leaderUrl)
+            .withHttpClient(cc.getUpdateShardHandler().getUpdateOnlyHttpClient())
             .markInternalRequest()
             ).build();
   }
@@ -339,7 +338,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   final private void commitOnLeader(String leaderUrl) throws SolrServerException,
       IOException {
-    try (HttpSolrClient client = buildRecoverySolrClient(leaderUrl)) {
+    try (Http2SolrClient client = buildRecoverySolrClient(leaderUrl)) {
       UpdateRequest ureq = new UpdateRequest();
       ureq.setParams(new ModifiableSolrParams());
       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, "terminal");
@@ -892,7 +891,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
         return leaderReplica;
       }
 
-      try (HttpSolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
+      try (Http2SolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
         SolrPingResponse resp = httpSolrClient.ping();
         return leaderReplica;
       } catch (IOException e) {
@@ -997,14 +996,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
     int conflictWaitMs = zkController.getLeaderConflictResolveWait();
 
     int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "100"));
-    try (HttpSolrClient client = buildRecoverySolrClient(leaderBaseUrl)) {
-      client.setSoTimeout(readTimeout);
-      HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
-      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+    try (Http2SolrClient client = buildRecoverySolrClient(leaderBaseUrl)) {
+      client.request(prepCmd);
+      // nocommit
+//      HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
+//      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
 
       log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);
 
-      mrr.future.get();
+     // mrr.future.get();
     }
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
index a0859f6..d5ddcbe 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -67,12 +67,8 @@ public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
       log.info("Wait to confirm our node is fully shutdown");
       cluster.waitForJettyToStop(nodeToStop);
 
-      // now that we're confident that node has stoped, check if a waitForState
-      // call will detect the missing replica -- shouldn't need long wait times (we know it's down)...
       log.info("Now check if waitForState will recognize we already have the exepcted state");
-      cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0));
-                                           
-      
+      cluster.waitForActiveCollection(col_name, 5000, TimeUnit.MILLISECONDS, 1, 0);
     } finally {
       cluster.shutdown();
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index def7c9b..6962ba1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -89,9 +89,9 @@ public class ParWorkExecService implements ExecutorService {
 
 
   public <T> Future<T> doSubmit(Callable<T> callable, boolean requiresAnotherThread) {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
+//    if (shutdown || terminated) {
+//      throw new RejectedExecutionException();
+//    }
     try {
       if (!requiresAnotherThread) {
         boolean success = checkLoad();
@@ -183,9 +183,9 @@ public class ParWorkExecService implements ExecutorService {
   }
 
   public Future<?> doSubmit(Runnable runnable, boolean requiresAnotherThread) {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
+//    if (shutdown || terminated) {
+//      throw new RejectedExecutionException();
+//    }
     if (!requiresAnotherThread) {
       boolean success = checkLoad();
       if (success) {
@@ -248,9 +248,9 @@ public class ParWorkExecService implements ExecutorService {
   public <T> List<Future<T>> invokeAll(
       Collection<? extends Callable<T>> collection)
       throws InterruptedException {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
+//    if (shutdown || terminated) {
+//      throw new RejectedExecutionException();
+//    }
     List<Future<T>> futures = new ArrayList<>(collection.size());
     for (Callable c : collection) {
       futures.add(submit(c));
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 50739c8..6beea1a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -213,14 +213,14 @@ public class ZkStateReader implements SolrCloseable {
 
   private final Runnable securityNodeListener;
 
-  private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(16, 0.75f, 5);
 
   // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
-  private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, CollectionPropsWatcher> collectionPropsObservers = new ConcurrentHashMap<>(16, 0.75f, 5);
 
   private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
 
-  private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
+  private final ExecutorService notifications = ParWork.getExecutor();
 
   private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
 
@@ -229,7 +229,7 @@ public class ZkStateReader implements SolrCloseable {
   /**
    * Used to submit notifications to Collection Properties watchers in order
    **/
-  private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("collectionPropsNotifications"));
+  private final ExecutorService collectionPropsNotifications = ParWork.getExecutor();
 
   private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
 
@@ -496,8 +496,20 @@ public class ZkStateReader implements SolrCloseable {
       throw new SolrException(ErrorCode.SERVER_ERROR, e);
     }
 
-    collectionPropsObservers.forEach((k, v) -> {
-      collectionPropsWatchers.computeIfAbsent(k, PropsWatcher::new).refreshAndWatch(true);
+    collectionPropsObservers.forEach((c, v) -> {
+      Stat stat = new Stat();
+      byte[] data = new byte[0];
+      try {
+        data = zkClient.getData(getCollectionPropsPath(c), new PropsWatcher(c), stat);
+      } catch (KeeperException e) {
+        log.error("KeeperException", e);
+      } catch (InterruptedException e) {
+        ParWork.propegateInterrupt(e);
+      }
+
+      VersionedCollectionProps props = new VersionedCollectionProps(
+          stat.getVersion(), (Map<String,String>) fromJSON(data));
+      watchedCollectionProps.put(c, props);
     });
   }
 
@@ -944,7 +956,7 @@ public class ZkStateReader implements SolrCloseable {
 
   public Replica getLeader(Set<String> liveNodes, DocCollection docCollection, String shard) {
     Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
-    if (replica != null && liveNodes.contains(replica.getNodeName())) {
+    if (replica != null && liveNodes.contains(replica.getNodeName()) && replica.getState() == Replica.State.ACTIVE) {
       return replica;
     }
     return null;
@@ -1194,40 +1206,33 @@ public class ZkStateReader implements SolrCloseable {
    * @return a map representing the key/value properties for the collection.
    */
   public Map<String, String> getCollectionProperties(final String collection, long cacheForMillis) {
-    synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
-      Watcher watcher = null;
-      if (cacheForMillis > 0) {
-        watcher = collectionPropsWatchers.compute(collection,
-            (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
-      }
-      VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
-      boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
-      long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
-      Map<String, String> properties;
-      if (haveUnexpiredProps) {
-        properties = vprops.props;
-        vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
-      } else {
-        try {
-          VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
-          properties = vcp.props;
-          if (cacheForMillis > 0) {
-            vcp.cacheUntilNs = untilNs;
-            watchedCollectionProps.put(collection, vcp);
-          } else {
-            // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
-            // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
-            if (!collectionPropsObservers.containsKey(collection)) {
-              watchedCollectionProps.remove(collection);
-            }
+    VersionedCollectionProps properties = watchedCollectionProps.get(collection);
+
+    if (properties == null) {
+      synchronized (watchedCollectionProps) {
+        properties = watchedCollectionProps.get(collection);
+        if (properties == null) {
+          PropsWatcher propsWatcher = new PropsWatcher(collection);
+          // load it
+          Stat stat = new Stat();
+          try {
+            byte[] data = zkClient.getData(getCollectionPropsPath(collection), propsWatcher, stat);
+
+            VersionedCollectionProps props = new VersionedCollectionProps(
+                stat.getVersion(), (Map<String,String>) fromJSON(data));
+            watchedCollectionProps.put(collection, props);
+            properties = props;
+          } catch (KeeperException e) {
+            log.error("KeeperException", e);
+          } catch (InterruptedException e) {
+            ParWork.propegateInterrupt(e);
           }
-        } catch (Exception e) {
-          ParWork.propegateInterrupt(e);
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
+
         }
       }
-      return properties;
     }
+
+    return properties.props;
   }
 
   private class VersionedCollectionProps {
@@ -1245,38 +1250,38 @@ public class ZkStateReader implements SolrCloseable {
     return COLLECTIONS_ZKNODE + '/' + collection + '/' + COLLECTION_PROPS_ZKNODE;
   }
 
-  @SuppressWarnings("unchecked")
-  private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
-    final String znodePath = getCollectionPropsPath(collection);
-    // lazy init cache cleaner once we know someone is using collection properties.
-    if (collectionPropsCacheCleaner == null) {
-      synchronized (this) { // There can be only one! :)
-        if (collectionPropsCacheCleaner == null) {
-          collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
-        }
-      }
-    }
-    while (true) {
-      try {
-        Stat stat = new Stat();
-        byte[] data = zkClient.getData(znodePath, watcher, stat);
-        return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
-      } catch (ClassCastException e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
-      } catch (KeeperException.NoNodeException e) {
-        if (watcher != null) {
-          // Leave an exists watch in place in case a collectionprops.json is created later.
-          Stat exists = zkClient.exists(znodePath, watcher);
-          if (exists != null) {
-            // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
-            // Loop and try again.
-            continue;
-          }
-        }
-        return new VersionedCollectionProps(-1, EMPTY_MAP);
-      }
-    }
-  }
+//  @SuppressWarnings("unchecked")
+//  private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
+//    final String znodePath = getCollectionPropsPath(collection);
+//    // lazy init cache cleaner once we know someone is using collection properties.
+//    if (collectionPropsCacheCleaner == null) {
+//      synchronized (this) { // There can be only one! :)
+//        if (collectionPropsCacheCleaner == null) {
+//          collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
+//        }
+//      }
+//    }
+//    while (true) {
+//      try {
+//        Stat stat = new Stat();
+//        byte[] data = zkClient.getData(znodePath, watcher, stat);
+//        return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
+//      } catch (ClassCastException e) {
+//        throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
+//      } catch (KeeperException.NoNodeException e) {
+//        if (watcher != null) {
+//          // Leave an exists watch in place in case a collectionprops.json is created later.
+//          Stat exists = zkClient.exists(znodePath, watcher);
+//          if (exists != null) {
+//            // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
+//            // Loop and try again.
+//            continue;
+//          }
+//        }
+//        return new VersionedCollectionProps(-1, EMPTY_MAP);
+//      }
+//    }
+//  }
 
   /**
    * Returns the content of /security.json from ZooKeeper as a Map
@@ -1425,11 +1430,6 @@ public class ZkStateReader implements SolrCloseable {
       watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
     }
 
-    public PropsWatcher renew(long forMillis) {
-      watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
-      return this;
-    }
-
     @Override
     public void process(WatchedEvent event) {
       // session events are not change events, and do not remove the watcher
@@ -1437,59 +1437,76 @@ public class ZkStateReader implements SolrCloseable {
         return;
       }
 
-      boolean expired = System.nanoTime() > watchUntilNs;
-      if (!collectionPropsObservers.containsKey(coll) && expired) {
-        // No one can be notified of the change, we can ignore it and "unset" the watch
-        log.debug("Ignoring property change for collection {}", coll);
-        return;
-      }
+      if (EventType.NodeDataChanged.equals(event.getType())) {
+        // load it
+        Stat stat = new Stat();
+        try {
+          byte[] data = zkClient.getData(getCollectionPropsPath(coll), this, stat);
 
-      log.info("A collection property change: [{}] for collection [{}] has occurred - updating...",
-          event, coll);
+          VersionedCollectionProps props = new VersionedCollectionProps(
+              stat.getVersion(), (Map<String,String>) fromJSON(data));
+          watchedCollectionProps.put(coll, props);
 
-      refreshAndWatch(true);
+          try (ParWork work = new ParWork(this, true)) {
+            for (CollectionPropsWatcher observer : collectionPropsObservers.values()) {
+              work.collect(() -> {
+                observer.onStateChanged(props.props);
+              });
+            }
+          }
+
+          //        Stat stat = new Stat();
+          //        byte[] data = zkClient.getData(znodePath, watcher, stat);
+          //        return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
+        } catch (KeeperException e) {
+          log.error("", e);
+        } catch (InterruptedException e) {
+          ParWork.propegateInterrupt(e);
+        }
+      }
+          
     }
 
     /**
      * Refresh collection properties from ZK and leave a watch for future changes. Updates the properties in
      * watchedCollectionProps with the results of the refresh. Optionally notifies watchers
      */
-    void refreshAndWatch(boolean notifyWatchers) {
-      try {
-        synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
-          VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
-          Map<String, String> properties = vcp.props;
-          VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
-          if (existingVcp == null ||                   // never called before, record what we found
-              vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
-              vcp.zkVersion == -1) {                   // node was deleted start over
-            watchedCollectionProps.put(coll, vcp);
-            if (notifyWatchers) {
-              notifyPropsWatchers(coll, properties);
-            }
-            if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
-
-              // We should not be caching a collection that has been deleted.
-              watchedCollectionProps.remove(coll);
-
-              // core ref counting not relevant here, don't need canRemove(), we just sent
-              // a notification of an empty set of properties, no reason to watch what doesn't exist.
-              collectionPropsObservers.remove(coll);
-
-              // This is the one time we know it's safe to throw this out. We just failed to set the watch
-              // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
-              collectionPropsWatchers.remove(coll);
-            }
-          }
-        }
-      } catch (KeeperException e) {
-        log.error("Lost collection property watcher for {} due to ZK error", coll, e);
-        throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.error("Lost collection property watcher for {} due to the thread being interrupted", coll, e);
-      }
-    }
+//    void refreshAndWatch(boolean notifyWatchers) {
+//      try {
+//        synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
+//          VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
+//          Map<String, String> properties = vcp.props;
+//          VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
+//          if (existingVcp == null ||                   // never called before, record what we found
+//              vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
+//              vcp.zkVersion == -1) {                   // node was deleted start over
+//            watchedCollectionProps.put(coll, vcp);
+//            if (notifyWatchers) {
+//              notifyPropsWatchers(coll, properties);
+//            }
+//            if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
+//
+//              // We should not be caching a collection that has been deleted.
+//              watchedCollectionProps.remove(coll);
+//
+//              // core ref counting not relevant here, don't need canRemove(), we just sent
+//              // a notification of an empty set of properties, no reason to watch what doesn't exist.
+//              collectionPropsObservers.remove(coll);
+//
+//              // This is the one time we know it's safe to throw this out. We just failed to set the watch
+//              // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
+//              collectionPropsWatchers.remove(coll);
+//            }
+//          }
+//        }
+//      } catch (KeeperException e) {
+//        log.error("Lost collection property watcher for {} due to ZK error", coll, e);
+//        throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+//      } catch (InterruptedException e) {
+//        Thread.currentThread().interrupt();
+//        log.error("Lost collection property watcher for {} due to the thread being interrupted", coll, e);
+//      }
+//    }
   }
 
   /**
@@ -1974,38 +1991,15 @@ public class ZkStateReader implements SolrCloseable {
     return updated;
   }
 
-  public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
-    AtomicBoolean watchSet = new AtomicBoolean(false);
-    collectionPropsObservers.compute(collection, (k, v) -> {
-      if (v == null) {
-        v = new CollectionWatch<>();
-        watchSet.set(true);
-      }
-      v.stateWatchers.add(propsWatcher);
-      return v;
-    });
-
-    if (watchSet.get()) {
-      collectionPropsWatchers.computeIfAbsent(collection, PropsWatcher::new).refreshAndWatch(false);
-    }
+  public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
+    collectionPropsObservers.remove(collection);
   }
 
-  public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
-    collectionPropsObservers.compute(collection, (k, v) -> {
-      if (v == null)
-        return null;
-      v.stateWatchers.remove(watcher);
-      if (v.canBeRemoved()) {
-        // don't want this to happen in middle of other blocks that might add it back.
-        synchronized (watchedCollectionProps) {
-          watchedCollectionProps.remove(collection);
-        }
-        return null;
-      }
-      return v;
-    });
+  public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
+    collectionPropsObservers.put(collection, propsWatcher);
   }
 
+
   public static class ConfigData {
     public Map<String, Object> data;
     public int version;
@@ -2239,45 +2233,6 @@ public class ZkStateReader implements SolrCloseable {
 
   }
 
-  private void notifyPropsWatchers(String collection, Map<String, String> properties) {
-    try {
-      collectionPropsNotifications.submit(new PropsNotification(collection, properties));
-    } catch (RejectedExecutionException e) {
-      if (!closed) {
-        log.error("Couldn't run collection properties notifications for {}", collection, e);
-      }
-    }
-  }
-
-  private class PropsNotification implements Runnable {
-
-    private final String collection;
-    private final Map<String, String> collectionProperties;
-    private final List<CollectionPropsWatcher> watchers = new ArrayList<>();
-
-    private PropsNotification(String collection, Map<String, String> collectionProperties) {
-      this.collection = collection;
-      this.collectionProperties = collectionProperties;
-      // guarantee delivery of notification regardless of what happens to collectionPropsObservers
-      // while we wait our turn in the executor by capturing the list on creation.
-      collectionPropsObservers.compute(collection, (k, v) -> {
-        if (v == null)
-          return null;
-        watchers.addAll(v.stateWatchers);
-        return v;
-      });
-    }
-
-    @Override
-    public void run() {
-      for (CollectionPropsWatcher watcher : watchers) {
-        if (watcher.onStateChanged(collectionProperties)) {
-          removeCollectionPropsWatcher(collection, watcher);
-        }
-      }
-    }
-  }
-
   private class CacheCleaner implements Runnable {
     public void run() {
       while (!Thread.interrupted()) {