You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/05 08:48:41 UTC

[pulsar] branch master updated: Refactor zookeeper session timeout handling into an interface (#6347)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6826040  Refactor zookeeper session timeout handling into an interface (#6347)
6826040 is described below

commit 6826040d32961e3e44f70622bcb89a116935ab68
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jun 5 16:48:26 2020 +0800

    Refactor zookeeper session timeout handling into an interface (#6347)
    
    Refactor zookeeper session timeout handling into an interface
---
 conf/broker.conf                                   |  6 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++
 .../pulsar/ZookeeperSessionExpiredHandlers.java    | 90 ++++++++++++++++++++++
 .../org/apache/pulsar/broker/PulsarService.java    | 12 ++-
 .../pulsar/broker/namespace/NamespaceService.java  | 24 ++++++
 .../pulsar/broker/namespace/OwnershipCache.java    |  4 +
 .../service/web/ZookeeperCacheLoader.java          | 32 ++++++--
 .../zookeeper/LocalZooKeeperConnectionService.java |  4 +-
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    | 48 ++++++++++++
 .../pulsar/zookeeper/ZooKeeperSessionWatcher.java  | 22 +++---
 .../zookeeper/ZookeeperSessionExpiredHandler.java  | 32 ++++++++
 .../LocalZooKeeperConnectionServiceTest.java       | 22 +++++-
 .../zookeeper/ZooKeeperSessionWatcherTest.java     | 51 ++++++++++--
 13 files changed, 323 insertions(+), 31 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2f25984..523f7ad 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -386,6 +386,12 @@ retentionCheckIntervalInSeconds=120
 # Use 0 or negative number to disable the check
 maxNumPartitionsPerPartitionedTopic=0
 
+# There are two policies when zookeeper session expired happens, "shutdown" and "reconnect".
+# If uses "shutdown" policy, shutdown the broker when zookeeper session expired happens.
+# If uses "reconnect" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper.
+# Node: the "reconnect" policy is an experiment feature
+zookeeperSessionExpiredPolicy=shutdown
+
 # Enable or disable system topic
 systemTopicEnabled=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3b58ff7..b91fe0a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -707,6 +707,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int maxNumPartitionsPerPartitionedTopic = 0;
 
+    @FieldContext(
+        doc = "There are two policies when zookeeper session expired happens, \"shutdown\" and \"reconnect\". \n\n"
+        + " If uses \"shutdown\" policy, shutdown the broker when zookeeper session expired happens.\n\n"
+        + " If uses \"reconnect\" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper."
+    )
+    private String zookeeperSessionExpiredPolicy = "shutdown";
+
     /**** --- Messaging Protocols --- ****/
 
     @FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
new file mode 100644
index 0000000..e654193
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
+import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
+import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
+
+/**
+ * Handlers for broker service to handle Zookeeper session expired
+ */
+public class ZookeeperSessionExpiredHandlers {
+
+    public static final String SHUTDOWN_POLICY = "shutdown";
+    public static final String RECONNECT_POLICY = "reconnect";
+
+    public static ZookeeperSessionExpiredHandler shutdownWhenZookeeperSessionExpired(ShutdownService shutdownService) {
+        return new ShutDownWhenSessionExpired(shutdownService);
+    }
+
+    public static ZookeeperSessionExpiredHandler reconnectWhenZookeeperSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) {
+        return new ReconnectWhenSessionExpired(pulsarService, shutdownService);
+    }
+
+    // Shutdown the messaging service when Zookeeper session expired.
+    public static class ShutDownWhenSessionExpired implements ZookeeperSessionExpiredHandler {
+
+        private final ShutdownService shutdownService;
+        private ZooKeeperSessionWatcher watcher;
+
+        public ShutDownWhenSessionExpired(ShutdownService shutdownService) {
+            this.shutdownService = shutdownService;
+        }
+
+        @Override
+        public void setWatcher(ZooKeeperSessionWatcher watcher) {
+            this.watcher = watcher;
+        }
+
+        @Override
+        public void onSessionExpired() {
+            this.watcher.close();
+            this.shutdownService.shutdown(-1);
+        }
+    }
+
+    // Reconnect to the zookeeper server and re-register ownership cache to avoid ownership change.
+    public static class ReconnectWhenSessionExpired implements ZookeeperSessionExpiredHandler {
+
+        private final PulsarService pulsarService;
+        private ZooKeeperSessionWatcher watcher;
+        private final ShutdownService shutdownService;
+
+        public ReconnectWhenSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) {
+            this.pulsarService = pulsarService;
+            this.shutdownService = shutdownService;
+        }
+
+        @Override
+        public void onSessionExpired() {
+            if (this.pulsarService.getNamespaceService() == null) {
+                this.watcher.close();
+                this.shutdownService.shutdown(-1);
+            }
+            this.pulsarService.getNamespaceService().registerOwnedBundles();
+        }
+
+        @Override
+        public void setWatcher(ZooKeeperSessionWatcher watcher) {
+            this.watcher = watcher;
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 689b0a4..b2d5b7a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -132,6 +132,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
+import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
+import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -417,7 +419,15 @@ public class PulsarService implements AutoCloseable {
             // Now we are ready to start services
             localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
                     config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
-            localZooKeeperConnectionProvider.start(shutdownService);
+            ZookeeperSessionExpiredHandler sessionExpiredHandler = null;
+            if (ZookeeperSessionExpiredHandlers.RECONNECT_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) {
+                sessionExpiredHandler = ZookeeperSessionExpiredHandlers.reconnectWhenZookeeperSessionExpired(this, shutdownService);
+            } else if (ZookeeperSessionExpiredHandlers.SHUTDOWN_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) {
+                sessionExpiredHandler = ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(shutdownService);
+            } else {
+                throw new IllegalArgumentException("Invalid zookeeper session expired policy " + config.getZookeeperSessionExpiredPolicy());
+            }
+            localZooKeeperConnectionProvider.start(sessionExpiredHandler);
 
             // Initialize and start service to access configuration repository.
             this.startZkCacheService();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 943761e..e917db7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -1267,4 +1268,27 @@ public class NamespaceService {
         }
         return isNameSpaceRegistered;
     }
+
+    public void registerOwnedBundles() {
+        List<OwnedBundle> ownedBundles = new ArrayList<>(ownershipCache.getOwnedBundles().values());
+        ownershipCache.invalidateLocalOwnerCache();
+        ownedBundles.forEach(ownedBundle -> {
+            String path = ServiceUnitZkUtils.path(ownedBundle.getNamespaceBundle());
+            try {
+                if (!pulsar.getLocalZkCache().checkRegNodeAndWaitExpired(path)) {
+                    ownershipCache.tryAcquiringOwnership(ownedBundle.getNamespaceBundle());
+                }
+            } catch (Exception e) {
+                try {
+                    ownedBundle.handleUnloadRequest(pulsar, 5, TimeUnit.MINUTES);
+                } catch (IllegalStateException ex) {
+                    // The owned bundle is not in active state.
+                } catch (Exception ex) {
+                    LOG.error("Unexpected exception occur when register owned bundle {}. Shutdown broker now !!!",
+                        ownedBundle.getNamespaceBundle(), ex);
+                    pulsar.getShutdownService().shutdown(-1);
+                }
+            }
+        });
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 7acd5f8..3d09b96 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -372,6 +372,10 @@ public class OwnershipCache {
         }
     }
 
+    public void invalidateLocalOwnerCache() {
+        this.ownedBundlesCache.synchronous().invalidateAll();
+    }
+
     public NamespaceEphemeralData getSelfOwnerInfo() {
         return selfOwnerInfo;
     }
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 66341fd..56332e7 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -34,6 +34,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
+import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,17 +68,33 @@ public class ZookeeperCacheLoader implements Closeable {
             int zookeeperSessionTimeoutMs) throws Exception {
         localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers,
                 zookeeperSessionTimeoutMs);
-        localZkConnectionSvc.start(exitCode -> {
-            log.error("Shutting down ZK sessions: {}", exitCode);
+        localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() {
+            @Override
+            public void onSessionExpired() {
+                log.error("Shutting down ZK sessions: {}", -1);
+            }
+
+            @Override
+            public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
+            }
         });
 
         this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
                 (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
-        localZkConnectionSvc.start(exitCode -> {
-            try {
-                localZkCache.getZooKeeper().close();
-            } catch (InterruptedException e) {
-                log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
+        localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() {
+            @Override
+            public void onSessionExpired() {
+                try {
+                    localZkCache.getZooKeeper().close();
+                } catch (InterruptedException e) {
+                    log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
+                }
+            }
+
+            @Override
+            public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
             }
         });
 
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
index 6334c29..e4be27e 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
@@ -59,7 +59,7 @@ public class LocalZooKeeperConnectionService implements Closeable {
         this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
     }
 
-    public void start(ShutdownService shutdownService) throws IOException {
+    public void start(ZookeeperSessionExpiredHandler sessionExpiredHandler) throws IOException {
         // Connect to local ZK
         CompletableFuture<ZooKeeper> zkFuture = zkClientFactory.create(zkConnect, SessionType.ReadWrite,
                 (int) zkSessionTimeoutMillis);
@@ -67,7 +67,7 @@ public class LocalZooKeeperConnectionService implements Closeable {
         try {
             localZooKeeper = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS);
             localZooKeeperSessionWatcher = new ZooKeeperSessionWatcher(localZooKeeper, zkSessionTimeoutMillis,
-                    shutdownService);
+                sessionExpiredHandler);
             localZooKeeperSessionWatcher.start();
             localZooKeeper.register(localZooKeeperSessionWatcher);
         } catch (Exception e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 2688c10..ff132f2 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -34,12 +34,14 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -49,6 +51,7 @@ import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -503,4 +506,49 @@ public abstract class ZooKeeperCache implements Watcher {
 
         this.backgroundExecutor.shutdown();
     }
+
+    public boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
+        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+        Watcher zkPrevRegNodewatcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                // Check for prev znode deletion. Connection expiration is
+                // not handling, since bookie has logic to shutdown.
+                if (EventType.NodeDeleted == event.getType()) {
+                    prevNodeLatch.countDown();
+                }
+            }
+        };
+        try {
+            Stat stat = getZooKeeper().exists(regPath, zkPrevRegNodewatcher);
+            if (null != stat) {
+                // if the ephemeral owner isn't current zookeeper client
+                // wait for it to be expired.
+                if (stat.getEphemeralOwner() != getZooKeeper().getSessionId()) {
+                    log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+                        + " {} ms for znode deletion", regPath, getZooKeeper().getSessionTimeout());
+                    // waiting for the previous bookie reg znode deletion
+                    if (!prevNodeLatch.await(getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) {
+                        throw new NodeExistsException(regPath);
+                    } else {
+                        return false;
+                    }
+                }
+                return true;
+            } else {
+                return false;
+            }
+        } catch (KeeperException ke) {
+            log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
+            throw new IOException("ZK exception checking and wait ephemeral znode "
+                + regPath + " expired", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
+            throw new IOException("Interrupted checking and wait ephemeral znode "
+                + regPath + " expired", ie);
+        }
+    }
+
+    private static Logger log = LoggerFactory.getLogger(ZooKeeperCache.class);
 }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
index 5bd9326..fa70c9e 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
@@ -54,7 +54,7 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSessionWatcher.class);
 
-    private final ShutdownService shutdownService;
+    private final ZookeeperSessionExpiredHandler sessionExpiredHandler;
     private final ZooKeeper zk;
     // Maximum time to wait for ZK session to be re-connected to quorum (set to 5/6 of SessionTimeout)
     private final long monitorTimeoutMillis;
@@ -68,11 +68,12 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
     private volatile boolean zkOperationCompleted = false;
     private ScheduledFuture<?> task;
 
-    public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ShutdownService shutdownService) {
+    public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ZookeeperSessionExpiredHandler sessionExpiredHandler) {
         this.zk = zk;
         this.monitorTimeoutMillis = zkSessionTimeoutMillis * 5 / 6;
         this.tickTimeMillis = zkSessionTimeoutMillis / 15;
-        this.shutdownService = shutdownService;
+        this.sessionExpiredHandler = sessionExpiredHandler;
+        this.sessionExpiredHandler.setWatcher(this);
     }
 
     public void start() {
@@ -100,9 +101,7 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
         case None:
             if (eventState == Watcher.Event.KeeperState.Expired) {
                 LOG.error("ZooKeeper session already expired, invoking shutdown");
-                close();
-                shuttingDown = true;
-                shutdownService.shutdown(-1);
+                sessionExpiredHandler.onSessionExpired();
             }
             break;
         default:
@@ -151,10 +150,8 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
                 keeperState = Watcher.Event.KeeperState.Disconnected;
             }
             if (keeperState == Watcher.Event.KeeperState.Expired) {
-                LOG.error("zoo keeper session expired, invoking shutdown service");
-                close();
-                shuttingDown = true;
-                shutdownService.shutdown(-1);
+                LOG.error("zookeeper session expired, invoking shutdown service");
+                sessionExpiredHandler.onSessionExpired();
             } else if (keeperState == Watcher.Event.KeeperState.Disconnected) {
                 if (disconnectedAt == 0) {
                     // this is the first disconnect, we should monitor the time out from now, so we record the time of
@@ -166,9 +163,7 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
                         - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt);
                 if (timeRemainingMillis <= 0) {
                     LOG.error("timeout expired for reconnecting, invoking shutdown service");
-                    close();
-                    shuttingDown = true;
-                    shutdownService.shutdown(-1);
+                    sessionExpiredHandler.onSessionExpired();
                 } else {
                     LOG.warn("zoo keeper disconnected, waiting to reconnect, time remaining = {} seconds",
                             TimeUnit.MILLISECONDS.toSeconds(timeRemainingMillis));
@@ -189,5 +184,6 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
         if (scheduler != null) {
             scheduler.shutdownNow();
         }
+        shuttingDown = true;
     }
 }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java
new file mode 100644
index 0000000..fc2f180
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+/**
+ * Handler interface on Zookeeper session expired
+ */
+public interface ZookeeperSessionExpiredHandler {
+
+    /**
+     * Signal when zookeeper session is expired.
+     */
+    void onSessionExpired();
+
+    void setWatcher(ZooKeeperSessionWatcher watcher);
+}
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
index 4c162f6..1dbb2be 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
@@ -35,7 +35,16 @@ public class LocalZooKeeperConnectionServiceTest {
         MockedZooKeeperClientFactoryImpl mockZkClientFactory = new MockedZooKeeperClientFactoryImpl();
         LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
                 mockZkClientFactory, "dummy", 1000);
-        localZkConnectionService.start(null);
+        localZkConnectionService.start(new ZookeeperSessionExpiredHandler() {
+            @Override
+            public void onSessionExpired() {
+            }
+
+            @Override
+            public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
+            }
+        });
 
         // Get ZooKeeper client
         MockZooKeeper zk = (MockZooKeeper) localZkConnectionService.getLocalZooKeeper();
@@ -91,7 +100,16 @@ public class LocalZooKeeperConnectionServiceTest {
         LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
                 new ZookeeperClientFactoryImpl(), "dummy", 1000);
         try {
-            localZkConnectionService.start(null);
+            localZkConnectionService.start(new ZookeeperSessionExpiredHandler() {
+                @Override
+                public void onSessionExpired() {
+                }
+
+                @Override
+                public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
+                }
+            });
             fail("should fail");
         } catch (Exception e) {
             assertTrue(e.getMessage().contains("Failed to establish session with local ZK"));
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
index 9c44103..5b49b2b 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
@@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 import org.apache.zookeeper.KeeperException.Code;
-import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -55,7 +54,20 @@ public class ZooKeeperSessionWatcherTest {
     void setup() {
         zkClient = MockZooKeeper.newInstance();
         shutdownService = new MockShutdownService();
-        sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, shutdownService);
+        sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, new ZookeeperSessionExpiredHandler() {
+
+            private ZooKeeperSessionWatcher watcher;
+            @Override
+            public void onSessionExpired() {
+                watcher.close();
+                shutdownService.shutdown(-1);
+            }
+
+            @Override
+            public void setWatcher(ZooKeeperSessionWatcher watcher) {
+                this.watcher = watcher;
+            }
+        });
     }
 
     @AfterMethod
@@ -113,8 +125,21 @@ public class ZooKeeperSessionWatcherTest {
     }
 
     @Test
-    public void testRun1() throws Exception {
-        ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000, shutdownService);
+    void testRun1() throws Exception {
+        ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000,
+            new ZookeeperSessionExpiredHandler() {
+                private ZooKeeperSessionWatcher watcher;
+                @Override
+                public void onSessionExpired() {
+                    watcher.close();
+                    shutdownService.shutdown(-1);
+                }
+
+                @Override
+                public void setWatcher(ZooKeeperSessionWatcher watcher) {
+                    this.watcher = watcher;
+                }
+            });
         sessionWatcherZkNull.run();
         assertFalse(sessionWatcherZkNull.isShutdownStarted());
         assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected);
@@ -123,8 +148,22 @@ public class ZooKeeperSessionWatcherTest {
     }
 
     @Test
-    public void testRun2() throws Exception {
-        ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0, shutdownService);
+    void testRun2() throws Exception {
+        ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0,
+            new ZookeeperSessionExpiredHandler() {
+
+                private ZooKeeperSessionWatcher watcher;
+                @Override
+                public void onSessionExpired() {
+                    watcher.close();
+                    shutdownService.shutdown(-1);
+                }
+
+                @Override
+                public void setWatcher(ZooKeeperSessionWatcher watcher) {
+                    this.watcher = watcher;
+                }
+            });
         sessionWatcherZkNull.run();
         assertTrue(sessionWatcherZkNull.isShutdownStarted());
         assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected);