You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@solr.apache.org by "noblepaul (via GitHub)" <gi...@apache.org> on 2023/04/24 13:40:16 UTC

[GitHub] [solr] noblepaul opened a new pull request, #1585: Use Zookeeper persistent recursive Zookeeper watches

noblepaul opened a new pull request, #1585:
URL: https://github.com/apache/solr/pull/1585

   PoC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


Re: [PR] Use Zookeeper persistent recursive Zookeeper watches [solr]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #1585:
URL: https://github.com/apache/solr/pull/1585#issuecomment-1947529644

   This PR had no visible activity in the past 60 days, labeling it as stale. Any new activity will remove the stale label. To attract more reviewers, please tag someone or notify the dev@solr.apache.org mailing list. Thank you for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] justinrsweeney commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "justinrsweeney (via GitHub)" <gi...@apache.org>.
justinrsweeney commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1205796738


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -326,14 +326,14 @@ private boolean updateDocCollection(String collection, DocCollection newState) {
               log.debug("Removing cached collection state for [{}]", collection);
               watch.currentState = null;
             } else { // both new and old states are non-null
-              int oldCVersion =
+              long oldCVersion =

Review Comment:
   Should we change these variable names since we are no longer using cversion?



##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+            && path.length() > collectionState.getZNode().length()
+            && path.startsWith(collectionPath)) {
+          if (log.isDebugEnabled()) {
+            log.debug("PRS node event : {}", event.getType());
+          }
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);
+            if (event.getType() == EventType.NodeCreated) {
+              if (oldState != null && newState.version < oldState.version) {
+                // we got a notification out of order? . Shouldn't happen
+                if (log.isTraceEnabled()) {
+                  log.trace("newState {} < oldState {}", newState, oldState);

Review Comment:
   I think we should bump up this log level assuming this is something that shouldn't regularly happen, I could see this being warn level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] hiteshk25 commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "hiteshk25 (via GitHub)" <gi...@apache.org>.
hiteshk25 commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1187594622


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+            && path.length() > collectionState.getZNode().length()
+            && path.startsWith(collectionPath)) {
+          if (log.isDebugEnabled()) {
+            log.debug("PRS node event : {}", event.getType());
+          }
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);
+            if (event.getType() == EventType.NodeCreated) {
+              if (oldState != null && newState.version < oldState.version) {

Review Comment:
   on line 439 below we have a null check for `oldState`, but here we are accessing `oldState`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] hiteshk25 commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "hiteshk25 (via GitHub)" <gi...@apache.org>.
hiteshk25 commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1187635295


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+            && path.length() > collectionState.getZNode().length()
+            && path.startsWith(collectionPath)) {
+          if (log.isDebugEnabled()) {
+            log.debug("PRS node event : {}", event.getType());
+          }
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);
+            if (event.getType() == EventType.NodeCreated) {
+              if (oldState != null && newState.version < oldState.version) {
+                // we got a notification out of order? . Shouldn't happen
+                if (log.isTraceEnabled()) {
+                  log.trace("newState {} < oldState {}", newState, oldState);
+                }
+                return;
+              }
+              if (oldState == null) {
+                // the state does not exist now. fetch everything
+
+                if (log.isTraceEnabled()) {
+                  log.trace("fresh replica, force fetch all {}", collectionPath);
+                }
+
+                prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
+              } else {
+                // the PRS entry is already available, need to do an update
+                Stat stat = null;
+                try {
+                  // get the stat of this child node
+                  stat = zkClient.exists(path, null, true);

Review Comment:
   What if we get stat of `state.json` here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] noblepaul commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "noblepaul (via GitHub)" <gi...@apache.org>.
noblepaul commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1191951328


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+            && path.length() > collectionState.getZNode().length()
+            && path.startsWith(collectionPath)) {
+          if (log.isDebugEnabled()) {
+            log.debug("PRS node event : {}", event.getType());
+          }
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);
+            if (event.getType() == EventType.NodeCreated) {
+              if (oldState != null && newState.version < oldState.version) {
+                // we got a notification out of order? . Shouldn't happen
+                if (log.isTraceEnabled()) {
+                  log.trace("newState {} < oldState {}", newState, oldState);
+                }
+                return;
+              }
+              if (oldState == null) {
+                // the state does not exist now. fetch everything
+
+                if (log.isTraceEnabled()) {
+                  log.trace("fresh replica, force fetch all {}", collectionPath);
+                }
+
+                prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
+              } else {
+                // the PRS entry is already available, need to do an update
+                Stat stat = null;
+                try {
+                  // get the stat of this child node
+                  stat = zkClient.exists(path, null, true);
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
+                }
+                if (stat == null) {
+                  // this got deleted so soon. nothing to do
+                  return;
+                }
+
+                if (log.isTraceEnabled()) {
+                  log.trace("PRS insert {}, v:{}", newState, stat.getCzxid());
+                }
+                prs = prs.insert(newState, stat.getCzxid());
+                if (prs == null) {
+                  // something went wrong
+                  prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
+                }
+              }
+            } else if (event.getType() == EventType.NodeDeleted) {
+              if (oldState == null) {
+                // This is already removed (unlikely, but)
+                return;
+              }
+              if (newState.version < oldState.version) {
+                if (log.isTraceEnabled()) {
+                  log.trace("PRS in-place remove {}", path);
+                }
+                // removed the duplicate without modifying the PRS Object
+                oldState.removeDuplicate(newState);
+                return;
+              } else {
+                if (log.isTraceEnabled()) {
+                  log.trace("Replica:  {} delete, force fetch", path);
+                }
+                // a replica got removed. we can't get the pzxid reliably. fetch everything
+                prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);

Review Comment:
   `cversion` is not used anymore at all. it's unreliable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] hiteshk25 commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "hiteshk25 (via GitHub)" <gi...@apache.org>.
hiteshk25 commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1187598453


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {

Review Comment:
   I think we can move all prs event handing related code in `prs` class, instead of keeping here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] noblepaul commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "noblepaul (via GitHub)" <gi...@apache.org>.
noblepaul commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1193817402


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {

Review Comment:
   yeah. we could do a final refactoring once everything else is right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] sonatype-lift[bot] commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "sonatype-lift[bot] (via GitHub)" <gi...@apache.org>.
sonatype-lift[bot] commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1182303050


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,136 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      log.trace("an event happened for {}, event: {}", coll, event.toString());
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+                && path.length() > collectionState.getZNode().length()
+                && path.startsWith(collectionPath)) {
+          log.debug("PRS node event : {}", event.getType());
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);

Review Comment:
   <picture><img alt="16% of developers fix this issue" src="https://lift.sonatype.com/api/commentimage/fixrate/16/display.svg"></picture>
   
   <b>*NULL_DEREFERENCE:</b>*  object `newState` last assigned on line 425 could be null and is dereferenced at line 426.
   
   ---
   
   <details><summary>ℹ️ Expand to see all <b>@sonatype-lift</b> commands</summary>
   
   You can reply with the following commands. For example, reply with ***@sonatype-lift ignoreall*** to leave out all findings.
   | **Command** | **Usage** |
   | ------------- | ------------- |
   | `@sonatype-lift ignore` | Leave out the above finding from this PR |
   | `@sonatype-lift ignoreall` | Leave out all the existing findings from this PR |
   | `@sonatype-lift exclude <file\|issue\|path\|tool>` | Exclude specified `file\|issue\|path\|tool` from Lift findings by updating your config.toml file |
   
   **Note:** When talking to LiftBot, you need to **refresh** the page to see its response.
   <sub>[Click here](https://github.com/apps/sonatype-lift/installations/new) to add LiftBot to another repo.</sub></details>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] noblepaul commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "noblepaul (via GitHub)" <gi...@apache.org>.
noblepaul commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1191949973


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+            && path.length() > collectionState.getZNode().length()
+            && path.startsWith(collectionPath)) {
+          if (log.isDebugEnabled()) {
+            log.debug("PRS node event : {}", event.getType());
+          }
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);
+            if (event.getType() == EventType.NodeCreated) {
+              if (oldState != null && newState.version < oldState.version) {
+                // we got a notification out of order? . Shouldn't happen
+                if (log.isTraceEnabled()) {
+                  log.trace("newState {} < oldState {}", newState, oldState);
+                }
+                return;
+              }
+              if (oldState == null) {
+                // the state does not exist now. fetch everything
+
+                if (log.isTraceEnabled()) {
+                  log.trace("fresh replica, force fetch all {}", collectionPath);
+                }
+
+                prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
+              } else {
+                // the PRS entry is already available, need to do an update
+                Stat stat = null;
+                try {
+                  // get the stat of this child node
+                  stat = zkClient.exists(path, null, true);

Review Comment:
   that's incorrect . The stat of `state.json` would've gotten changed by another event



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] hiteshk25 commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "hiteshk25 (via GitHub)" <gi...@apache.org>.
hiteshk25 commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1187636106


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+            && path.length() > collectionState.getZNode().length()
+            && path.startsWith(collectionPath)) {
+          if (log.isDebugEnabled()) {
+            log.debug("PRS node event : {}", event.getType());
+          }
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);
+            if (event.getType() == EventType.NodeCreated) {
+              if (oldState != null && newState.version < oldState.version) {
+                // we got a notification out of order? . Shouldn't happen
+                if (log.isTraceEnabled()) {
+                  log.trace("newState {} < oldState {}", newState, oldState);
+                }
+                return;
+              }
+              if (oldState == null) {
+                // the state does not exist now. fetch everything
+
+                if (log.isTraceEnabled()) {
+                  log.trace("fresh replica, force fetch all {}", collectionPath);
+                }
+
+                prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
+              } else {
+                // the PRS entry is already available, need to do an update
+                Stat stat = null;
+                try {
+                  // get the stat of this child node
+                  stat = zkClient.exists(path, null, true);
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
+                }
+                if (stat == null) {
+                  // this got deleted so soon. nothing to do
+                  return;
+                }
+
+                if (log.isTraceEnabled()) {
+                  log.trace("PRS insert {}, v:{}", newState, stat.getCzxid());
+                }
+                prs = prs.insert(newState, stat.getCzxid());
+                if (prs == null) {
+                  // something went wrong
+                  prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);
+                }
+              }
+            } else if (event.getType() == EventType.NodeDeleted) {
+              if (oldState == null) {
+                // This is already removed (unlikely, but)
+                return;
+              }
+              if (newState.version < oldState.version) {
+                if (log.isTraceEnabled()) {
+                  log.trace("PRS in-place remove {}", path);
+                }
+                // removed the duplicate without modifying the PRS Object
+                oldState.removeDuplicate(newState);
+                return;
+              } else {
+                if (log.isTraceEnabled()) {
+                  log.trace("Replica:  {} delete, force fetch", path);
+                }
+                // a replica got removed. we can't get the pzxid reliably. fetch everything
+                prs = PerReplicaStatesFetcher.fetch(collectionPath, zkClient, null);

Review Comment:
   similar, can we get stat of `state.json` and use cversion of it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] noblepaul commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "noblepaul (via GitHub)" <gi...@apache.org>.
noblepaul commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1208801000


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -368,10 +368,152 @@ private StatefulCollectionWatch compute(
         BiFunction<String, StatefulCollectionWatch, StatefulCollectionWatch> remappingFunction) {
       return statefulWatchesByCollectionName.compute(collectionName, remappingFunction);
     }
+
+    public boolean isWatched(String coll) {
+      return statefulWatchesByCollectionName.containsKey(coll);
+    }
   }
 
-  private static class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
+  private class StatefulCollectionWatch extends CollectionWatch<DocCollectionWatcher> {
     private DocCollection currentState;
+
+    Watcher persistentWatcher;
+
+    final String coll;
+    private final String collectionPath;
+
+    private StatefulCollectionWatch(String coll) {
+      this.coll = coll;
+      this.collectionPath = DocCollection.getCollectionPath(coll);
+    }
+
+    public void handleWatch(WatchedEvent event) {
+      if (!collectionWatches.isWatched(coll)) {
+        return;
+      }
+      if (log.isTraceEnabled()) {
+        log.trace("an event happened for {}, event: {}", coll, event.toString());
+      }
+      DocCollection collectionState = getCollection(coll);
+      if (collectionPath.equals(event.getPath())) {
+        DocCollection newState = null;
+        try {
+          newState = fetchCollectionState(coll, null);
+        } catch (KeeperException.SessionExpiredException
+            | KeeperException.ConnectionLossException e) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: ", e);
+        } catch (KeeperException e) {
+          log.error("exception for collection: [{}]", coll, e);
+          throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.error("exception for collection: [{}]", coll, e);
+        }
+        collectionWatches.updateDocCollection(coll, newState);
+        synchronized (getUpdateLock()) {
+          constructState(Collections.singleton(coll));
+        }
+      }
+      if (collectionState != null && collectionState.isPerReplicaState()) {
+        String path = event.getPath();
+        if ((event.getType() == EventType.NodeCreated || event.getType() == EventType.NodeDeleted)
+            && path.length() > collectionState.getZNode().length()
+            && path.startsWith(collectionPath)) {
+          if (log.isDebugEnabled()) {
+            log.debug("PRS node event : {}", event.getType());
+          }
+
+          PerReplicaStates prs = collectionState.getPerReplicaStates();
+          if (prs != null) {
+            String stateStr = path.substring(collectionPath.length() + 1);
+            PerReplicaStates.State newState = PerReplicaStates.State.parse(stateStr);
+            PerReplicaStates.State oldState = prs.states.get(newState.replica);
+            if (event.getType() == EventType.NodeCreated) {
+              if (oldState != null && newState.version < oldState.version) {
+                // we got a notification out of order? . Shouldn't happen
+                if (log.isTraceEnabled()) {
+                  log.trace("newState {} < oldState {}", newState, oldState);

Review Comment:
   yes. we should



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org


[GitHub] [solr] noblepaul commented on a diff in pull request #1585: Use Zookeeper persistent recursive Zookeeper watches

Posted by "noblepaul (via GitHub)" <gi...@apache.org>.
noblepaul commented on code in PR #1585:
URL: https://github.com/apache/solr/pull/1585#discussion_r1208800892


##########
solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java:
##########
@@ -326,14 +326,14 @@ private boolean updateDocCollection(String collection, DocCollection newState) {
               log.debug("Removing cached collection state for [{}]", collection);
               watch.currentState = null;
             } else { // both new and old states are non-null
-              int oldCVersion =
+              long oldCVersion =

Review Comment:
   yes, we should



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@solr.apache.org
For additional commands, e-mail: issues-help@solr.apache.org