You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/03/25 23:47:01 UTC

svn commit: r1669240 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ZkController.java

Author: noble
Date: Wed Mar 25 22:47:00 2015
New Revision: 1669240

URL: http://svn.apache.org/r1669240
Log:
SOLR-6924: conf node listening made more robust to take care of session expiry and reconnect

Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1669240&r1=1669239&r2=1669240&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Mar 25 22:47:00 2015
@@ -2304,10 +2304,10 @@ public final class ZkController {
   private final Map<String , Set<Runnable>> confDirectoryListeners =  new HashMap<>();
 
   void watchZKConfDir(final String zkDir) {
-    log.info("watch zkdir " + zkDir);
+    log.info("watch zkdir {}" , zkDir);
     if (!confDirectoryListeners.containsKey(zkDir)) {
       confDirectoryListeners.put(zkDir,  new HashSet<Runnable>());
-      setConfWatcher(zkDir, new WatcherImpl(zkDir));
+      setConfWatcher(zkDir, new WatcherImpl(zkDir), null);
 
     }
 
@@ -2315,54 +2315,70 @@ public final class ZkController {
   }
   private class WatcherImpl implements Watcher{
     private final String zkDir ;
-
     private WatcherImpl(String dir) {
       this.zkDir = dir;
     }
 
     @Override
-      public void process(WatchedEvent event) {
-        try {
-
-          synchronized (confDirectoryListeners) {
-            // if this is not among directories to be watched then don't set the watcher anymore
-            if( !confDirectoryListeners.containsKey(zkDir)) {
-              log.info("Watcher on {} is removed ", zkDir);
-              return;
-            }
-            Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
-            if (listeners != null && !listeners.isEmpty()) {
-              final Set<Runnable> listenersCopy = new HashSet<>(listeners);
-              new Thread() {
-                //run these in a separate thread because this can be long running
-                public void run() {
-                  for (final Runnable listener : listenersCopy) {
-                    try {
-                      listener.run();
-                    } catch (Exception e) {
-                      log.warn("listener throws error", e);
-                    }
-                  }
-                }
-              }.start();
-            }
-
-          }
+    public void process(WatchedEvent event) {
+      Stat stat = null;
+      try {
+        stat = zkClient.exists(zkDir, null, true);
+      } catch (KeeperException e) {
+        //ignore , it is not a big deal
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
 
+      boolean resetWatcher = false;
+      try {
+          resetWatcher = fireEventListeners(this.zkDir);
         } finally {
           if (Event.EventType.None.equals(event.getType())) {
             log.info("A node got unwatched for {}", zkDir);
-            return;
           } else {
-            setConfWatcher(zkDir,this);
+            if(resetWatcher) setConfWatcher(zkDir,this,stat);
           }
         }
       }
+
+  }
+  private boolean fireEventListeners(String zkDir) {
+    synchronized (confDirectoryListeners) {
+      // if this is not among directories to be watched then don't set the watcher anymore
+      if( !confDirectoryListeners.containsKey(zkDir)) {
+        log.info("Watcher on {} is removed ", zkDir);
+        return false;
+      }
+      Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
+      if (listeners != null && !listeners.isEmpty()) {
+        final Set<Runnable> listenersCopy = new HashSet<>(listeners);
+        new Thread() {
+          //run these in a separate thread because this can be long running
+          public void run() {
+            for (final Runnable listener : listenersCopy) {
+              try {
+                listener.run();
+              } catch (Exception e) {
+                log.warn("listener throws error", e);
+              }
+            }
+          }
+        }.start();
+      }
+
     }
+    return true;
+  }
 
-  private void setConfWatcher(String zkDir, Watcher watcher) {
+  private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) {
     try {
-      zkClient.exists(zkDir, watcher, true);
+      Stat newStat = zkClient.exists(zkDir, watcher, true);
+      if (stat != null && newStat.getVersion() > stat.getVersion()) {
+        //a race condition where a we missed an even fired
+        //so fire the event listeners
+        fireEventListeners(zkDir);
+      }
     } catch (KeeperException e) {
       log.error("failed to set watcher for conf dir {} ", zkDir);
     } catch (InterruptedException e) {
@@ -2378,6 +2394,7 @@ public final class ZkController {
         synchronized (confDirectoryListeners){
           for (String s : confDirectoryListeners.keySet()) {
             watchZKConfDir(s);
+            fireEventListeners(s);
           }
         }
       }