You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/03/25 23:47:01 UTC
svn commit: r1669240 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/ZkController.java
Author: noble
Date: Wed Mar 25 22:47:00 2015
New Revision: 1669240
URL: http://svn.apache.org/r1669240
Log:
SOLR-6924: conf node listening made more robust to take care of session expiry and reconnect
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1669240&r1=1669239&r2=1669240&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Mar 25 22:47:00 2015
@@ -2304,10 +2304,10 @@ public final class ZkController {
private final Map<String , Set<Runnable>> confDirectoryListeners = new HashMap<>();
void watchZKConfDir(final String zkDir) {
- log.info("watch zkdir " + zkDir);
+ log.info("watch zkdir {}" , zkDir);
if (!confDirectoryListeners.containsKey(zkDir)) {
confDirectoryListeners.put(zkDir, new HashSet<Runnable>());
- setConfWatcher(zkDir, new WatcherImpl(zkDir));
+ setConfWatcher(zkDir, new WatcherImpl(zkDir), null);
}
@@ -2315,54 +2315,70 @@ public final class ZkController {
}
private class WatcherImpl implements Watcher{
private final String zkDir ;
-
private WatcherImpl(String dir) {
this.zkDir = dir;
}
@Override
- public void process(WatchedEvent event) {
- try {
-
- synchronized (confDirectoryListeners) {
- // if this is not among directories to be watched then don't set the watcher anymore
- if( !confDirectoryListeners.containsKey(zkDir)) {
- log.info("Watcher on {} is removed ", zkDir);
- return;
- }
- Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
- if (listeners != null && !listeners.isEmpty()) {
- final Set<Runnable> listenersCopy = new HashSet<>(listeners);
- new Thread() {
- //run these in a separate thread because this can be long running
- public void run() {
- for (final Runnable listener : listenersCopy) {
- try {
- listener.run();
- } catch (Exception e) {
- log.warn("listener throws error", e);
- }
- }
- }
- }.start();
- }
-
- }
+ public void process(WatchedEvent event) {
+ Stat stat = null;
+ try {
+ stat = zkClient.exists(zkDir, null, true);
+ } catch (KeeperException e) {
+ //ignore , it is not a big deal
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ boolean resetWatcher = false;
+ try {
+ resetWatcher = fireEventListeners(this.zkDir);
} finally {
if (Event.EventType.None.equals(event.getType())) {
log.info("A node got unwatched for {}", zkDir);
- return;
} else {
- setConfWatcher(zkDir,this);
+ if(resetWatcher) setConfWatcher(zkDir,this,stat);
}
}
}
+
+ }
+ private boolean fireEventListeners(String zkDir) {
+ synchronized (confDirectoryListeners) {
+ // if this is not among directories to be watched then don't set the watcher anymore
+ if( !confDirectoryListeners.containsKey(zkDir)) {
+ log.info("Watcher on {} is removed ", zkDir);
+ return false;
+ }
+ Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
+ if (listeners != null && !listeners.isEmpty()) {
+ final Set<Runnable> listenersCopy = new HashSet<>(listeners);
+ new Thread() {
+ //run these in a separate thread because this can be long running
+ public void run() {
+ for (final Runnable listener : listenersCopy) {
+ try {
+ listener.run();
+ } catch (Exception e) {
+ log.warn("listener throws error", e);
+ }
+ }
+ }
+ }.start();
+ }
+
}
+ return true;
+ }
- private void setConfWatcher(String zkDir, Watcher watcher) {
+ private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) {
try {
- zkClient.exists(zkDir, watcher, true);
+ Stat newStat = zkClient.exists(zkDir, watcher, true);
+ if (stat != null && newStat.getVersion() > stat.getVersion()) {
+ //a race condition where a we missed an even fired
+ //so fire the event listeners
+ fireEventListeners(zkDir);
+ }
} catch (KeeperException e) {
log.error("failed to set watcher for conf dir {} ", zkDir);
} catch (InterruptedException e) {
@@ -2378,6 +2394,7 @@ public final class ZkController {
synchronized (confDirectoryListeners){
for (String s : confDirectoryListeners.keySet()) {
watchZKConfDir(s);
+ fireEventListeners(s);
}
}
}