You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/12 19:03:20 UTC

[lucene-solr] branch reference_impl updated: @1466 Quick cleanup for visitors - my office needs the same.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new fe8df05  @1466 Quick cleanup for visitors - my office needs the same.
fe8df05 is described below

commit fe8df059effddabe79a062e4178c75118b3eef20
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Mar 12 13:02:45 2021 -0600

    @1466 Quick cleanup for visitors - my office needs the same.
    
    Took 26 minutes
---
 .../solr/cloud/ChaosMonkeySafeLeaderTest.java      |  2 +-
 .../apache/solr/common/cloud/ZkStateReader.java    | 78 ++++++++++++++--------
 .../org/apache/solr/cloud/MockZkStateReader.java   |  4 +-
 3 files changed, 52 insertions(+), 32 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
index 88ea75e..0815c69 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
@@ -171,7 +171,7 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
       assertTrue(String.valueOf(indexThread.getFailCount()), indexThread.getFailCount() < 10);
     }
 
-    cluster.getSolrClient().getZkStateReader().waitForState(COLLECTION, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+    cluster.getSolrClient().getZkStateReader().waitForState(COLLECTION, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
       if (collectionState == null) return false;
       Collection<Slice> slices = collectionState.getSlices();
       for (Slice slice : slices) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 0c7f14e..12d2671 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -940,7 +940,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         IOUtils.closeQuietly(stateWatcher);
         stateWatcher.removeWatch();
       });
-      stateWatchersMap.clear();
 
       IOUtils.closeQuietly(this.liveNodesWatcher);
       IOUtils.closeQuietly(this.collectionsChildWatcher);
@@ -948,17 +947,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         IOUtils.closeQuietly(zkClient);
       }
 
-      //      if (notifications != null) {
-      //        notifications.shutdownNow();
-      //      }
-
-      //      waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
-      //      waitLatches.clear();
-
+      stateWatchersMap.clear();
     } finally {
       assert ObjectReleaseTracker.release(this);
     }
-
   }
 
   public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException, TimeoutException {
@@ -2084,16 +2076,15 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       return;
     }
 
-    AtomicBoolean reconstructState = new AtomicBoolean(false);
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
-        reconstructState.set(true);
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
         stateWatchersMap.put(collection, sw);
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
+        return v;
       }
       v.coreRefCount.incrementAndGet();
       return v;
@@ -2187,8 +2178,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
    * <code>onStateChanged</code> returns <code>true</code>
    * </p>
    */
-  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
-    if (log.isDebugEnabled()) log.debug("registerDocCollectionWatcher {}", collection);
+  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher docCollectionWatcher) {
+    log.debug("registerDocCollectionWatcher {}", collection);
 
     if (collection == null) {
       throw new IllegalArgumentException("Collection cannot be null");
@@ -2196,14 +2187,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
     collectionWatches.compute(collection, (k, v) -> {
       if (v == null) {
+        log.debug("creating new Collection State watcher for {}", collection);
         v = new CollectionWatch<>(collection);
         CollectionStateWatcher sw = new CollectionStateWatcher(collection);
         stateWatchersMap.put(collection, sw);
+        log.debug("creating watches and refreshing state watcher for {}", collection);
         sw.createWatch();
         sw.refresh();
         sw.refreshStateUpdates();
       }
-      v.stateWatchers.add(stateWatcher);
+      log.debug("Adding a DocCollectionWatcher for collection={} currentCount={}", collection, v.stateWatchers.size());
+      v.stateWatchers.add(docCollectionWatcher);
       return v;
     });
 
@@ -2214,8 +2208,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 //      state = clusterState.getCollectionOrNull(collection);
 //    }
 
-    if (stateWatcher.onStateChanged(state) == true) {
-      removeDocCollectionWatcher(collection, stateWatcher);
+    if (docCollectionWatcher.onStateChanged(state) == true) {
+      removeDocCollectionWatcher(collection, docCollectionWatcher);
     }
 
   }
@@ -2396,16 +2390,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       if (v == null) return null;
       v.stateWatchers.remove(watcher);
       if (v.canBeRemoved()) {
-        log.info("no longer watch collection {}", collection);
+        log.debug("no longer watch collection {}", collection);
         watchedCollectionStates.remove(collection);
         LazyCollectionRef docRef = new LazyCollectionRef(collection);
         lazyCollectionStates.put(collection, docRef);
         clusterState.put(collection, docRef);
-        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
-        if (stateWatcher != null) {
-          IOUtils.closeQuietly(stateWatcher);
-          stateWatcher.removeWatch();
-        }
+//        CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
+//        if (stateWatcher != null) {
+//          IOUtils.closeQuietly(stateWatcher);
+//          stateWatcher.removeWatch();
+//        }
         reconstructState.set(true);
         return null;
       }
@@ -2442,16 +2436,42 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         return true;
       }
 
-      if (live) {
-        return true;
-      }
+//      if (live) {
+//        return true;
+//      }
 
-      watchedCollectionStates.put(coll, newState);
-      if (!collectionWatches.containsKey(coll)) {
-        lazyCollectionStates.remove(coll);
+      boolean updated = false;
+      // CAS update loop
+      while (true) {
+        if (!collectionWatches.containsKey(coll)) {
+          break;
+        }
+        DocCollection oldState = watchedCollectionStates.get(coll);
+        if (oldState == null) {
+          if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
+            if (log.isDebugEnabled()) {
+              log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
+            }
+            updated = true;
+            break;
+          }
+        } else {
+          if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+            // no change to state, but we might have been triggered by the addition of a
+            // state watcher, so run notifications
+            updated = true;
+            break;
+          }
+          if (watchedCollectionStates.replace(coll, oldState, newState)) {
+            if (log.isDebugEnabled()) {
+              log.debug("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
+            }
+            updated = true;
+            break;
+          }
+        }
       }
 
-
       return true;
     } catch (Exception e) {
       log.error("Failing updating clusterstate", e);
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java b/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java
index 01dec842..dea2a7f 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MockZkStateReader.java
@@ -46,9 +46,9 @@ public class MockZkStateReader extends ZkStateReader {
   }
 
   @Override
-  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
+  public void registerDocCollectionWatcher(String collection, DocCollectionWatcher docCollectionWatcher) {
     // the doc collection will never be changed by this mock
     // so we just call onStateChanged once with the existing DocCollection object an return
-    stateWatcher.onStateChanged(clusterState.get(collection).get());
+    docCollectionWatcher.onStateChanged(clusterState.get(collection).get());
   }
 }