You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/01 16:52:57 UTC

[lucene-solr] branch reference_impl_dev updated (51f4f1f -> 821ff29)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


 discard 51f4f1f  @482 Everything is whack. I forget everything means everything. God, I can't believe I have been this far and further before and then just wandered away from it and forgot I did anything more than play with some good resource usage and http2. I must have curated for so long. Even refreshing on it a few times sense and I still forget how much coverage the whackness has.
     new 509e976  @482 Everything is whack. I forget everything means everything. God, I can't believe I have been this far and further before and then just wandered away from it and forgot I did anything more than play with some good resource usage and http2. I must have curated for so long. Even refreshing on it a few times since and I still forget how much coverage the whackness has.
     new 821ff29  @483 Lean into that chisel.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (51f4f1f)
            \
             N -- N -- N   refs/heads/reference_impl_dev (821ff29)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/solr/cloud/ZkController.java   | 12 +++++----
 .../apache/solr/common/cloud/ZkStateReader.java    | 30 ++++++----------------
 2 files changed, 15 insertions(+), 27 deletions(-)


[lucene-solr] 01/02: @482 Everything is whack. I forget everything means everything. God, I can't believe I have been this far and further before and then just wandered away from it and forgot I did anything more than play with some good resource usage and http2. I must have curated for so long. Even refreshing on it a few times since and I still forget how much coverage the whackness has.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 509e976525d95ce2afabaa033f89e83e8916eb2e
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Aug 1 11:41:53 2020 -0500

    @482 Everything is whack. I forget everything means everything. God, I can't believe I have been this far and further before and then just wandered away from it and forgot I did anything more than play with some good resource usage and http2. I must have curated for so long. Even refreshing on it a few times since and I still forget how much coverage the whackness has.
---
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  24 +-
 .../cloud/TestWaitForStateWithJettyShutdowns.java  |   6 +-
 .../org/apache/solr/common/ParWorkExecService.java |  18 +-
 .../apache/solr/common/cloud/ZkStateReader.java    | 327 +++++++++------------
 4 files changed, 163 insertions(+), 212 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d335428..8cce948 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -34,6 +34,7 @@ import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -184,14 +185,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
   }
 
   /** Builds a new HttpSolrClient for use in recovery.  Caller must close */
-  private final HttpSolrClient buildRecoverySolrClient(final String leaderUrl) {
+  private final Http2SolrClient buildRecoverySolrClient(final String leaderUrl) {
     // workaround for SOLR-13605: get the configured timeouts & set them directly
     // (even though getRecoveryOnlyHttpClient() already has them set)
     final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig();
-    return (new HttpSolrClient.Builder(leaderUrl)
-            .withConnectionTimeout(3)
-            .withSocketTimeout(5)
-            .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
+    return (new Http2SolrClient.Builder(leaderUrl)
+            .withHttpClient(cc.getUpdateShardHandler().getUpdateOnlyHttpClient())
             .markInternalRequest()
             ).build();
   }
@@ -339,7 +338,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   final private void commitOnLeader(String leaderUrl) throws SolrServerException,
       IOException {
-    try (HttpSolrClient client = buildRecoverySolrClient(leaderUrl)) {
+    try (Http2SolrClient client = buildRecoverySolrClient(leaderUrl)) {
       UpdateRequest ureq = new UpdateRequest();
       ureq.setParams(new ModifiableSolrParams());
       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, "terminal");
@@ -892,7 +891,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
         return leaderReplica;
       }
 
-      try (HttpSolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
+      try (Http2SolrClient httpSolrClient = buildRecoverySolrClient(leaderReplica.getCoreUrl())) {
         SolrPingResponse resp = httpSolrClient.ping();
         return leaderReplica;
       } catch (IOException e) {
@@ -997,14 +996,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
     int conflictWaitMs = zkController.getLeaderConflictResolveWait();
 
     int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "100"));
-    try (HttpSolrClient client = buildRecoverySolrClient(leaderBaseUrl)) {
-      client.setSoTimeout(readTimeout);
-      HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
-      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+    try (Http2SolrClient client = buildRecoverySolrClient(leaderBaseUrl)) {
+      client.request(prepCmd);
+      // nocommit
+//      HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
+//      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
 
       log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);
 
-      mrr.future.get();
+     // mrr.future.get();
     }
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
index a0859f6..d5ddcbe 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -67,12 +67,8 @@ public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
       log.info("Wait to confirm our node is fully shutdown");
       cluster.waitForJettyToStop(nodeToStop);
 
-      // now that we're confident that node has stoped, check if a waitForState
-      // call will detect the missing replica -- shouldn't need long wait times (we know it's down)...
       log.info("Now check if waitForState will recognize we already have the exepcted state");
-      cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0));
-                                           
-      
+      cluster.waitForActiveCollection(col_name, 5000, TimeUnit.MILLISECONDS, 1, 0);
     } finally {
       cluster.shutdown();
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index def7c9b..6962ba1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -89,9 +89,9 @@ public class ParWorkExecService implements ExecutorService {
 
 
   public <T> Future<T> doSubmit(Callable<T> callable, boolean requiresAnotherThread) {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
+//    if (shutdown || terminated) {
+//      throw new RejectedExecutionException();
+//    }
     try {
       if (!requiresAnotherThread) {
         boolean success = checkLoad();
@@ -183,9 +183,9 @@ public class ParWorkExecService implements ExecutorService {
   }
 
   public Future<?> doSubmit(Runnable runnable, boolean requiresAnotherThread) {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
+//    if (shutdown || terminated) {
+//      throw new RejectedExecutionException();
+//    }
     if (!requiresAnotherThread) {
       boolean success = checkLoad();
       if (success) {
@@ -248,9 +248,9 @@ public class ParWorkExecService implements ExecutorService {
   public <T> List<Future<T>> invokeAll(
       Collection<? extends Callable<T>> collection)
       throws InterruptedException {
-    if (shutdown || terminated) {
-      throw new RejectedExecutionException();
-    }
+//    if (shutdown || terminated) {
+//      throw new RejectedExecutionException();
+//    }
     List<Future<T>> futures = new ArrayList<>(collection.size());
     for (Callable c : collection) {
       futures.add(submit(c));
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 50739c8..6beea1a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -213,14 +213,14 @@ public class ZkStateReader implements SolrCloseable {
 
   private final Runnable securityNodeListener;
 
-  private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>(16, 0.75f, 5);
 
   // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
-  private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
+  private ConcurrentHashMap<String, CollectionPropsWatcher> collectionPropsObservers = new ConcurrentHashMap<>(16, 0.75f, 5);
 
   private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
 
-  private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
+  private final ExecutorService notifications = ParWork.getExecutor();
 
   private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
 
@@ -229,7 +229,7 @@ public class ZkStateReader implements SolrCloseable {
   /**
    * Used to submit notifications to Collection Properties watchers in order
    **/
-  private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("collectionPropsNotifications"));
+  private final ExecutorService collectionPropsNotifications = ParWork.getExecutor();
 
   private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
 
@@ -496,8 +496,20 @@ public class ZkStateReader implements SolrCloseable {
       throw new SolrException(ErrorCode.SERVER_ERROR, e);
     }
 
-    collectionPropsObservers.forEach((k, v) -> {
-      collectionPropsWatchers.computeIfAbsent(k, PropsWatcher::new).refreshAndWatch(true);
+    collectionPropsObservers.forEach((c, v) -> {
+      Stat stat = new Stat();
+      byte[] data = new byte[0];
+      try {
+        data = zkClient.getData(getCollectionPropsPath(c), new PropsWatcher(c), stat);
+      } catch (KeeperException e) {
+        log.error("KeeperException", e);
+      } catch (InterruptedException e) {
+        ParWork.propegateInterrupt(e);
+      }
+
+      VersionedCollectionProps props = new VersionedCollectionProps(
+          stat.getVersion(), (Map<String,String>) fromJSON(data));
+      watchedCollectionProps.put(c, props);
     });
   }
 
@@ -944,7 +956,7 @@ public class ZkStateReader implements SolrCloseable {
 
   public Replica getLeader(Set<String> liveNodes, DocCollection docCollection, String shard) {
     Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
-    if (replica != null && liveNodes.contains(replica.getNodeName())) {
+    if (replica != null && liveNodes.contains(replica.getNodeName()) && replica.getState() == Replica.State.ACTIVE) {
       return replica;
     }
     return null;
@@ -1194,40 +1206,33 @@ public class ZkStateReader implements SolrCloseable {
    * @return a map representing the key/value properties for the collection.
    */
   public Map<String, String> getCollectionProperties(final String collection, long cacheForMillis) {
-    synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
-      Watcher watcher = null;
-      if (cacheForMillis > 0) {
-        watcher = collectionPropsWatchers.compute(collection,
-            (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
-      }
-      VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
-      boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
-      long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
-      Map<String, String> properties;
-      if (haveUnexpiredProps) {
-        properties = vprops.props;
-        vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
-      } else {
-        try {
-          VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
-          properties = vcp.props;
-          if (cacheForMillis > 0) {
-            vcp.cacheUntilNs = untilNs;
-            watchedCollectionProps.put(collection, vcp);
-          } else {
-            // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
-            // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
-            if (!collectionPropsObservers.containsKey(collection)) {
-              watchedCollectionProps.remove(collection);
-            }
+    VersionedCollectionProps properties = watchedCollectionProps.get(collection);
+
+    if (properties == null) {
+      synchronized (watchedCollectionProps) {
+        properties = watchedCollectionProps.get(collection);
+        if (properties == null) {
+          PropsWatcher propsWatcher = new PropsWatcher(collection);
+          // load it
+          Stat stat = new Stat();
+          try {
+            byte[] data = zkClient.getData(getCollectionPropsPath(collection), propsWatcher, stat);
+
+            VersionedCollectionProps props = new VersionedCollectionProps(
+                stat.getVersion(), (Map<String,String>) fromJSON(data));
+            watchedCollectionProps.put(collection, props);
+            properties = props;
+          } catch (KeeperException e) {
+            log.error("KeeperException", e);
+          } catch (InterruptedException e) {
+            ParWork.propegateInterrupt(e);
           }
-        } catch (Exception e) {
-          ParWork.propegateInterrupt(e);
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
+
         }
       }
-      return properties;
     }
+
+    return properties.props;
   }
 
   private class VersionedCollectionProps {
@@ -1245,38 +1250,38 @@ public class ZkStateReader implements SolrCloseable {
     return COLLECTIONS_ZKNODE + '/' + collection + '/' + COLLECTION_PROPS_ZKNODE;
   }
 
-  @SuppressWarnings("unchecked")
-  private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
-    final String znodePath = getCollectionPropsPath(collection);
-    // lazy init cache cleaner once we know someone is using collection properties.
-    if (collectionPropsCacheCleaner == null) {
-      synchronized (this) { // There can be only one! :)
-        if (collectionPropsCacheCleaner == null) {
-          collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
-        }
-      }
-    }
-    while (true) {
-      try {
-        Stat stat = new Stat();
-        byte[] data = zkClient.getData(znodePath, watcher, stat);
-        return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
-      } catch (ClassCastException e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
-      } catch (KeeperException.NoNodeException e) {
-        if (watcher != null) {
-          // Leave an exists watch in place in case a collectionprops.json is created later.
-          Stat exists = zkClient.exists(znodePath, watcher);
-          if (exists != null) {
-            // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
-            // Loop and try again.
-            continue;
-          }
-        }
-        return new VersionedCollectionProps(-1, EMPTY_MAP);
-      }
-    }
-  }
+//  @SuppressWarnings("unchecked")
+//  private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
+//    final String znodePath = getCollectionPropsPath(collection);
+//    // lazy init cache cleaner once we know someone is using collection properties.
+//    if (collectionPropsCacheCleaner == null) {
+//      synchronized (this) { // There can be only one! :)
+//        if (collectionPropsCacheCleaner == null) {
+//          collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
+//        }
+//      }
+//    }
+//    while (true) {
+//      try {
+//        Stat stat = new Stat();
+//        byte[] data = zkClient.getData(znodePath, watcher, stat);
+//        return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
+//      } catch (ClassCastException e) {
+//        throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
+//      } catch (KeeperException.NoNodeException e) {
+//        if (watcher != null) {
+//          // Leave an exists watch in place in case a collectionprops.json is created later.
+//          Stat exists = zkClient.exists(znodePath, watcher);
+//          if (exists != null) {
+//            // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
+//            // Loop and try again.
+//            continue;
+//          }
+//        }
+//        return new VersionedCollectionProps(-1, EMPTY_MAP);
+//      }
+//    }
+//  }
 
   /**
    * Returns the content of /security.json from ZooKeeper as a Map
@@ -1425,11 +1430,6 @@ public class ZkStateReader implements SolrCloseable {
       watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
     }
 
-    public PropsWatcher renew(long forMillis) {
-      watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
-      return this;
-    }
-
     @Override
     public void process(WatchedEvent event) {
       // session events are not change events, and do not remove the watcher
@@ -1437,59 +1437,76 @@ public class ZkStateReader implements SolrCloseable {
         return;
       }
 
-      boolean expired = System.nanoTime() > watchUntilNs;
-      if (!collectionPropsObservers.containsKey(coll) && expired) {
-        // No one can be notified of the change, we can ignore it and "unset" the watch
-        log.debug("Ignoring property change for collection {}", coll);
-        return;
-      }
+      if (EventType.NodeDataChanged.equals(event.getType())) {
+        // load it
+        Stat stat = new Stat();
+        try {
+          byte[] data = zkClient.getData(getCollectionPropsPath(coll), this, stat);
 
-      log.info("A collection property change: [{}] for collection [{}] has occurred - updating...",
-          event, coll);
+          VersionedCollectionProps props = new VersionedCollectionProps(
+              stat.getVersion(), (Map<String,String>) fromJSON(data));
+          watchedCollectionProps.put(coll, props);
 
-      refreshAndWatch(true);
+          try (ParWork work = new ParWork(this, true)) {
+            for (CollectionPropsWatcher observer : collectionPropsObservers.values()) {
+              work.collect(() -> {
+                observer.onStateChanged(props.props);
+              });
+            }
+          }
+
+          //        Stat stat = new Stat();
+          //        byte[] data = zkClient.getData(znodePath, watcher, stat);
+          //        return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
+        } catch (KeeperException e) {
+          log.error("", e);
+        } catch (InterruptedException e) {
+          ParWork.propegateInterrupt(e);
+        }
+      }
+          
     }
 
     /**
      * Refresh collection properties from ZK and leave a watch for future changes. Updates the properties in
      * watchedCollectionProps with the results of the refresh. Optionally notifies watchers
      */
-    void refreshAndWatch(boolean notifyWatchers) {
-      try {
-        synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
-          VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
-          Map<String, String> properties = vcp.props;
-          VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
-          if (existingVcp == null ||                   // never called before, record what we found
-              vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
-              vcp.zkVersion == -1) {                   // node was deleted start over
-            watchedCollectionProps.put(coll, vcp);
-            if (notifyWatchers) {
-              notifyPropsWatchers(coll, properties);
-            }
-            if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
-
-              // We should not be caching a collection that has been deleted.
-              watchedCollectionProps.remove(coll);
-
-              // core ref counting not relevant here, don't need canRemove(), we just sent
-              // a notification of an empty set of properties, no reason to watch what doesn't exist.
-              collectionPropsObservers.remove(coll);
-
-              // This is the one time we know it's safe to throw this out. We just failed to set the watch
-              // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
-              collectionPropsWatchers.remove(coll);
-            }
-          }
-        }
-      } catch (KeeperException e) {
-        log.error("Lost collection property watcher for {} due to ZK error", coll, e);
-        throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.error("Lost collection property watcher for {} due to the thread being interrupted", coll, e);
-      }
-    }
+//    void refreshAndWatch(boolean notifyWatchers) {
+//      try {
+//        synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
+//          VersionedCollectionProps vcp = fetchCollectionProperties(coll, this);
+//          Map<String, String> properties = vcp.props;
+//          VersionedCollectionProps existingVcp = watchedCollectionProps.get(coll);
+//          if (existingVcp == null ||                   // never called before, record what we found
+//              vcp.zkVersion > existingVcp.zkVersion || // newer info we should update
+//              vcp.zkVersion == -1) {                   // node was deleted start over
+//            watchedCollectionProps.put(coll, vcp);
+//            if (notifyWatchers) {
+//              notifyPropsWatchers(coll, properties);
+//            }
+//            if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
+//
+//              // We should not be caching a collection that has been deleted.
+//              watchedCollectionProps.remove(coll);
+//
+//              // core ref counting not relevant here, don't need canRemove(), we just sent
+//              // a notification of an empty set of properties, no reason to watch what doesn't exist.
+//              collectionPropsObservers.remove(coll);
+//
+//              // This is the one time we know it's safe to throw this out. We just failed to set the watch
+//              // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
+//              collectionPropsWatchers.remove(coll);
+//            }
+//          }
+//        }
+//      } catch (KeeperException e) {
+//        log.error("Lost collection property watcher for {} due to ZK error", coll, e);
+//        throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+//      } catch (InterruptedException e) {
+//        Thread.currentThread().interrupt();
+//        log.error("Lost collection property watcher for {} due to the thread being interrupted", coll, e);
+//      }
+//    }
   }
 
   /**
@@ -1974,38 +1991,15 @@ public class ZkStateReader implements SolrCloseable {
     return updated;
   }
 
-  public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
-    AtomicBoolean watchSet = new AtomicBoolean(false);
-    collectionPropsObservers.compute(collection, (k, v) -> {
-      if (v == null) {
-        v = new CollectionWatch<>();
-        watchSet.set(true);
-      }
-      v.stateWatchers.add(propsWatcher);
-      return v;
-    });
-
-    if (watchSet.get()) {
-      collectionPropsWatchers.computeIfAbsent(collection, PropsWatcher::new).refreshAndWatch(false);
-    }
+  public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
+    collectionPropsObservers.remove(collection);
   }
 
-  public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
-    collectionPropsObservers.compute(collection, (k, v) -> {
-      if (v == null)
-        return null;
-      v.stateWatchers.remove(watcher);
-      if (v.canBeRemoved()) {
-        // don't want this to happen in middle of other blocks that might add it back.
-        synchronized (watchedCollectionProps) {
-          watchedCollectionProps.remove(collection);
-        }
-        return null;
-      }
-      return v;
-    });
+  public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
+    collectionPropsObservers.put(collection, propsWatcher);
   }
 
+
   public static class ConfigData {
     public Map<String, Object> data;
     public int version;
@@ -2239,45 +2233,6 @@ public class ZkStateReader implements SolrCloseable {
 
   }
 
-  private void notifyPropsWatchers(String collection, Map<String, String> properties) {
-    try {
-      collectionPropsNotifications.submit(new PropsNotification(collection, properties));
-    } catch (RejectedExecutionException e) {
-      if (!closed) {
-        log.error("Couldn't run collection properties notifications for {}", collection, e);
-      }
-    }
-  }
-
-  private class PropsNotification implements Runnable {
-
-    private final String collection;
-    private final Map<String, String> collectionProperties;
-    private final List<CollectionPropsWatcher> watchers = new ArrayList<>();
-
-    private PropsNotification(String collection, Map<String, String> collectionProperties) {
-      this.collection = collection;
-      this.collectionProperties = collectionProperties;
-      // guarantee delivery of notification regardless of what happens to collectionPropsObservers
-      // while we wait our turn in the executor by capturing the list on creation.
-      collectionPropsObservers.compute(collection, (k, v) -> {
-        if (v == null)
-          return null;
-        watchers.addAll(v.stateWatchers);
-        return v;
-      });
-    }
-
-    @Override
-    public void run() {
-      for (CollectionPropsWatcher watcher : watchers) {
-        if (watcher.onStateChanged(collectionProperties)) {
-          removeCollectionPropsWatcher(collection, watcher);
-        }
-      }
-    }
-  }
-
   private class CacheCleaner implements Runnable {
     public void run() {
       while (!Thread.interrupted()) {


[lucene-solr] 02/02: @483 Lean into that chisel.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 821ff29eb70f3239b0d600c6997118d583667a76
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Aug 1 11:52:24 2020 -0500

    @483 Lean into that chisel.
---
 .../java/org/apache/solr/cloud/ZkController.java   | 12 +++++----
 .../apache/solr/common/cloud/ZkStateReader.java    | 30 ++++++----------------
 2 files changed, 15 insertions(+), 27 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index fe74022..809e4b0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -623,9 +623,7 @@ public class ZkController implements Closeable {
     this.shudownCalled = true;
 
     this.isClosed = true;
-    if (overseer != null) {
-      overseer.closeAndDone();
-    }
+
     try (ParWork closer = new ParWork(this, true)) {
       closer.collect(electionContexts.values());
       closer.collect(collectionToTerms.values());
@@ -633,10 +631,14 @@ public class ZkController implements Closeable {
       closer.collect(cloudManager);
       closer.collect(cloudSolrClient);
       closer.collect(replicateFromLeaders.values());
+      closer.collect(overseerContexts.values());
       closer.addCollect("internals");
 
-      closer.collect(overseerContexts.values());
-      closer.collect(overseer);
+      closer.collect(() -> {
+        if (overseer != null) {
+          overseer.closeAndDone();
+        }
+      });
       closer.addCollect("overseer");
       closer.collect(zkStateReader);
       closer.addCollect("zkStateReader");
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 6beea1a..9998af8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -226,14 +226,8 @@ public class ZkStateReader implements SolrCloseable {
 
   private Set<ClusterPropertiesListener> clusterPropertiesListeners = ConcurrentHashMap.newKeySet();
 
-  /**
-   * Used to submit notifications to Collection Properties watchers in order
-   **/
-  private final ExecutorService collectionPropsNotifications = ParWork.getExecutor();
-
   private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
 
-  private volatile Future<?> collectionPropsCacheCleaner; // only kept to identify if the cleaner has already been started.
 
   /**
    * Get current {@link AutoScalingConfig}.
@@ -914,21 +908,16 @@ public class ZkStateReader implements SolrCloseable {
     this.closed = true;
     try {
       try (ParWork closer = new ParWork(this, true)) {
-        notifications.shutdown();
-        collectionPropsNotifications.shutdown();
-
-        try {
-          collectionPropsCacheCleaner.cancel(true);
-        } catch (NullPointerException e) {
-          // okay
-        }
-        closer.add("waitLatchesReader", () -> {
-          waitLatches.forEach((w) -> w.countDown());
-          return null;
-        });
+//        closer.add("waitLatchesReader", () -> {
+//          waitLatches.forEach((w) -> w.countDown());
+//          return null;
+//        });
 
         closer
-            .add("notifications", notifications, collectionPropsNotifications);
+            .add("notifications", notifications, () -> {
+              waitLatches.forEach((w) -> w.countDown());
+              return null;
+            });
 
         if (closeClient) {
           closer.add("zkClient", zkClient);
@@ -2015,9 +2004,6 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   private void notifyStateWatchers(String collection, DocCollection collectionState) {
-    if (this.closed) {
-      return;
-    }
     try {
       notifications.submit(new Notification(collection, collectionState));
     } catch (RejectedExecutionException e) {