You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/21 08:00:13 UTC

[05/29] incubator-distributedlog git commit: DL: remove watches when unregister children watches

DL: remove watches when unregister children watches

RB_ID=833858


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b571d3b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b571d3b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b571d3b4

Branch: refs/heads/merge/DL-98
Commit: b571d3b4adcc140acca881979782474c27459d34
Parents: 517c77c
Author: Sijie Guo <si...@twitter.com>
Authored: Mon May 23 21:01:57 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:35:26 2016 -0800

----------------------------------------------------------------------
 .../twitter/distributedlog/ZooKeeperClient.java |  1 +
 .../distributedlog/zk/ZKWatcherManager.java     | 34 +++++++++++++++++++-
 .../distributedlog/zk/TestZKWatcherManager.java |  1 +
 3 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
index 912d592..9ea9e37 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
@@ -169,6 +169,7 @@ public class ZooKeeperClient {
         this.credentials = credentials;
         this.watcherManager = ZKWatcherManager.newBuilder()
                 .name(name)
+                .zkc(this)
                 .statsLogger(statsLogger.scope("watcher_manager"))
                 .build();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
index 4068737..a24b560 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
@@ -17,8 +17,11 @@
  */
 package com.twitter.distributedlog.zk;
 
+import com.twitter.distributedlog.ZooKeeperClient;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -50,31 +53,40 @@ public class ZKWatcherManager implements Watcher {
 
         private String _name;
         private StatsLogger _statsLogger;
+        private ZooKeeperClient _zkc;
 
         public Builder name(String name) {
             this._name = name;
             return this;
         }
 
+        public Builder zkc(ZooKeeperClient zkc) {
+            this._zkc = zkc;
+            return this;
+        }
+
         public Builder statsLogger(StatsLogger statsLogger) {
             this._statsLogger = statsLogger;
             return this;
         }
 
         public ZKWatcherManager build() {
-            return new ZKWatcherManager(_name, _statsLogger);
+            return new ZKWatcherManager(_name, _zkc, _statsLogger);
         }
     }
 
     private final String name;
+    private final ZooKeeperClient zkc;
     private final StatsLogger statsLogger;
 
     protected final ConcurrentMap<String, Set<Watcher>> childWatches;
     protected final AtomicInteger allWatchesGauge;
 
     private ZKWatcherManager(String name,
+                             ZooKeeperClient zkc,
                              StatsLogger statsLogger) {
         this.name = name;
+        this.zkc = zkc;
         this.statsLogger = statsLogger;
 
         // watches
@@ -141,6 +153,26 @@ public class ZKWatcherManager implements Watcher {
                 logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path);
             }
             if (watchers.isEmpty()) {
+                // best-efforts to remove watches
+                try {
+                    if (null != zkc) {
+                        zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() {
+                            @Override
+                            public void processResult(int rc, String path, Object ctx) {
+                                if (KeeperException.Code.OK.intValue() == rc) {
+                                    logger.debug("Successfully removed children watches from {}", path);
+                                } else {
+                                    logger.debug("Encountered exception on removing children watches from {}",
+                                            path, KeeperException.create(KeeperException.Code.get(rc)));
+                                }
+                            }
+                        }, null);
+                    }
+                } catch (InterruptedException e) {
+                    logger.debug("Encountered exception on removing watches from {}", path, e);
+                } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+                    logger.debug("Encountered exception on removing watches from {}", path, e);
+                }
                 childWatches.remove(path, watchers);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
index ee00ab9..6f269c3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
@@ -34,6 +34,7 @@ public class TestZKWatcherManager {
     public void testRegisterUnregisterWatcher() throws Exception {
         ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder()
                 .name("test-register-unregister-watcher")
+                .zkc(null)
                 .statsLogger(NullStatsLogger.INSTANCE)
                 .build();
         String path = "/test-register-unregister-watcher";