You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/05 08:48:41 UTC
[pulsar] branch master updated: Refactor zookeeper session timeout
handling into an interface (#6347)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6826040 Refactor zookeeper session timeout handling into an interface (#6347)
6826040 is described below
commit 6826040d32961e3e44f70622bcb89a116935ab68
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jun 5 16:48:26 2020 +0800
Refactor zookeeper session timeout handling into an interface (#6347)
Refactor zookeeper session timeout handling into an interface
---
conf/broker.conf | 6 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 7 ++
.../pulsar/ZookeeperSessionExpiredHandlers.java | 90 ++++++++++++++++++++++
.../org/apache/pulsar/broker/PulsarService.java | 12 ++-
.../pulsar/broker/namespace/NamespaceService.java | 24 ++++++
.../pulsar/broker/namespace/OwnershipCache.java | 4 +
.../service/web/ZookeeperCacheLoader.java | 32 ++++++--
.../zookeeper/LocalZooKeeperConnectionService.java | 4 +-
.../apache/pulsar/zookeeper/ZooKeeperCache.java | 48 ++++++++++++
.../pulsar/zookeeper/ZooKeeperSessionWatcher.java | 22 +++---
.../zookeeper/ZookeeperSessionExpiredHandler.java | 32 ++++++++
.../LocalZooKeeperConnectionServiceTest.java | 22 +++++-
.../zookeeper/ZooKeeperSessionWatcherTest.java | 51 ++++++++++--
13 files changed, 323 insertions(+), 31 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 2f25984..523f7ad 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -386,6 +386,12 @@ retentionCheckIntervalInSeconds=120
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0
+# There are two policies when zookeeper session expired happens, "shutdown" and "reconnect".
+# If uses "shutdown" policy, shutdown the broker when zookeeper session expired happens.
+# If uses "reconnect" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper.
+# Node: the "reconnect" policy is an experiment feature
+zookeeperSessionExpiredPolicy=shutdown
+
# Enable or disable system topic
systemTopicEnabled=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 3b58ff7..b91fe0a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -707,6 +707,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int maxNumPartitionsPerPartitionedTopic = 0;
+ @FieldContext(
+ doc = "There are two policies when zookeeper session expired happens, \"shutdown\" and \"reconnect\". \n\n"
+ + " If uses \"shutdown\" policy, shutdown the broker when zookeeper session expired happens.\n\n"
+ + " If uses \"reconnect\" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper."
+ )
+ private String zookeeperSessionExpiredPolicy = "shutdown";
+
/**** --- Messaging Protocols --- ****/
@FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
new file mode 100644
index 0000000..e654193
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
+import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
+import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
+
+/**
+ * Handlers for broker service to handle Zookeeper session expired
+ */
+public class ZookeeperSessionExpiredHandlers {
+
+ public static final String SHUTDOWN_POLICY = "shutdown";
+ public static final String RECONNECT_POLICY = "reconnect";
+
+ public static ZookeeperSessionExpiredHandler shutdownWhenZookeeperSessionExpired(ShutdownService shutdownService) {
+ return new ShutDownWhenSessionExpired(shutdownService);
+ }
+
+ public static ZookeeperSessionExpiredHandler reconnectWhenZookeeperSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) {
+ return new ReconnectWhenSessionExpired(pulsarService, shutdownService);
+ }
+
+ // Shutdown the messaging service when Zookeeper session expired.
+ public static class ShutDownWhenSessionExpired implements ZookeeperSessionExpiredHandler {
+
+ private final ShutdownService shutdownService;
+ private ZooKeeperSessionWatcher watcher;
+
+ public ShutDownWhenSessionExpired(ShutdownService shutdownService) {
+ this.shutdownService = shutdownService;
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+ this.watcher = watcher;
+ }
+
+ @Override
+ public void onSessionExpired() {
+ this.watcher.close();
+ this.shutdownService.shutdown(-1);
+ }
+ }
+
+ // Reconnect to the zookeeper server and re-register ownership cache to avoid ownership change.
+ public static class ReconnectWhenSessionExpired implements ZookeeperSessionExpiredHandler {
+
+ private final PulsarService pulsarService;
+ private ZooKeeperSessionWatcher watcher;
+ private final ShutdownService shutdownService;
+
+ public ReconnectWhenSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) {
+ this.pulsarService = pulsarService;
+ this.shutdownService = shutdownService;
+ }
+
+ @Override
+ public void onSessionExpired() {
+ if (this.pulsarService.getNamespaceService() == null) {
+ this.watcher.close();
+ this.shutdownService.shutdown(-1);
+ }
+ this.pulsarService.getNamespaceService().registerOwnedBundles();
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+ this.watcher = watcher;
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 689b0a4..b2d5b7a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -132,6 +132,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
+import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
+import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
@@ -417,7 +419,15 @@ public class PulsarService implements AutoCloseable {
// Now we are ready to start services
localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
- localZooKeeperConnectionProvider.start(shutdownService);
+ ZookeeperSessionExpiredHandler sessionExpiredHandler = null;
+ if (ZookeeperSessionExpiredHandlers.RECONNECT_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) {
+ sessionExpiredHandler = ZookeeperSessionExpiredHandlers.reconnectWhenZookeeperSessionExpired(this, shutdownService);
+ } else if (ZookeeperSessionExpiredHandlers.SHUTDOWN_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) {
+ sessionExpiredHandler = ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(shutdownService);
+ } else {
+ throw new IllegalArgumentException("Invalid zookeeper session expired policy " + config.getZookeeperSessionExpiredPolicy());
+ }
+ localZooKeeperConnectionProvider.start(sessionExpiredHandler);
// Initialize and start service to access configuration repository.
this.startZkCacheService();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 943761e..e917db7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -1267,4 +1268,27 @@ public class NamespaceService {
}
return isNameSpaceRegistered;
}
+
+ public void registerOwnedBundles() {
+ List<OwnedBundle> ownedBundles = new ArrayList<>(ownershipCache.getOwnedBundles().values());
+ ownershipCache.invalidateLocalOwnerCache();
+ ownedBundles.forEach(ownedBundle -> {
+ String path = ServiceUnitZkUtils.path(ownedBundle.getNamespaceBundle());
+ try {
+ if (!pulsar.getLocalZkCache().checkRegNodeAndWaitExpired(path)) {
+ ownershipCache.tryAcquiringOwnership(ownedBundle.getNamespaceBundle());
+ }
+ } catch (Exception e) {
+ try {
+ ownedBundle.handleUnloadRequest(pulsar, 5, TimeUnit.MINUTES);
+ } catch (IllegalStateException ex) {
+ // The owned bundle is not in active state.
+ } catch (Exception ex) {
+ LOG.error("Unexpected exception occur when register owned bundle {}. Shutdown broker now !!!",
+ ownedBundle.getNamespaceBundle(), ex);
+ pulsar.getShutdownService().shutdown(-1);
+ }
+ }
+ });
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 7acd5f8..3d09b96 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -372,6 +372,10 @@ public class OwnershipCache {
}
}
+ public void invalidateLocalOwnerCache() {
+ this.ownedBundlesCache.synchronous().invalidateAll();
+ }
+
public NamespaceEphemeralData getSelfOwnerInfo() {
return selfOwnerInfo;
}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 66341fd..56332e7 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -34,6 +34,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
+import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,17 +68,33 @@ public class ZookeeperCacheLoader implements Closeable {
int zookeeperSessionTimeoutMs) throws Exception {
localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers,
zookeeperSessionTimeoutMs);
- localZkConnectionSvc.start(exitCode -> {
- log.error("Shutting down ZK sessions: {}", exitCode);
+ localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() {
+ @Override
+ public void onSessionExpired() {
+ log.error("Shutting down ZK sessions: {}", -1);
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
+ }
});
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
(int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
- localZkConnectionSvc.start(exitCode -> {
- try {
- localZkCache.getZooKeeper().close();
- } catch (InterruptedException e) {
- log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
+ localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() {
+ @Override
+ public void onSessionExpired() {
+ try {
+ localZkCache.getZooKeeper().close();
+ } catch (InterruptedException e) {
+ log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
}
});
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
index 6334c29..e4be27e 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
@@ -59,7 +59,7 @@ public class LocalZooKeeperConnectionService implements Closeable {
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
}
- public void start(ShutdownService shutdownService) throws IOException {
+ public void start(ZookeeperSessionExpiredHandler sessionExpiredHandler) throws IOException {
// Connect to local ZK
CompletableFuture<ZooKeeper> zkFuture = zkClientFactory.create(zkConnect, SessionType.ReadWrite,
(int) zkSessionTimeoutMillis);
@@ -67,7 +67,7 @@ public class LocalZooKeeperConnectionService implements Closeable {
try {
localZooKeeper = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS);
localZooKeeperSessionWatcher = new ZooKeeperSessionWatcher(localZooKeeper, zkSessionTimeoutMillis,
- shutdownService);
+ sessionExpiredHandler);
localZooKeeperSessionWatcher.start();
localZooKeeper.register(localZooKeeperSessionWatcher);
} catch (Exception e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 2688c10..ff132f2 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -34,12 +34,14 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.common.util.FutureUtil;
@@ -49,6 +51,7 @@ import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -503,4 +506,49 @@ public abstract class ZooKeeperCache implements Watcher {
this.backgroundExecutor.shutdown();
}
+
+ public boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
+ final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+ Watcher zkPrevRegNodewatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Check for prev znode deletion. Connection expiration is
+ // not handling, since bookie has logic to shutdown.
+ if (EventType.NodeDeleted == event.getType()) {
+ prevNodeLatch.countDown();
+ }
+ }
+ };
+ try {
+ Stat stat = getZooKeeper().exists(regPath, zkPrevRegNodewatcher);
+ if (null != stat) {
+ // if the ephemeral owner isn't current zookeeper client
+ // wait for it to be expired.
+ if (stat.getEphemeralOwner() != getZooKeeper().getSessionId()) {
+ log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+ + " {} ms for znode deletion", regPath, getZooKeeper().getSessionTimeout());
+ // waiting for the previous bookie reg znode deletion
+ if (!prevNodeLatch.await(getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) {
+ throw new NodeExistsException(regPath);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (KeeperException ke) {
+ log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
+ throw new IOException("ZK exception checking and wait ephemeral znode "
+ + regPath + " expired", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
+ throw new IOException("Interrupted checking and wait ephemeral znode "
+ + regPath + " expired", ie);
+ }
+ }
+
+ private static Logger log = LoggerFactory.getLogger(ZooKeeperCache.class);
}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
index 5bd9326..fa70c9e 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java
@@ -54,7 +54,7 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSessionWatcher.class);
- private final ShutdownService shutdownService;
+ private final ZookeeperSessionExpiredHandler sessionExpiredHandler;
private final ZooKeeper zk;
// Maximum time to wait for ZK session to be re-connected to quorum (set to 5/6 of SessionTimeout)
private final long monitorTimeoutMillis;
@@ -68,11 +68,12 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
private volatile boolean zkOperationCompleted = false;
private ScheduledFuture<?> task;
- public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ShutdownService shutdownService) {
+ public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ZookeeperSessionExpiredHandler sessionExpiredHandler) {
this.zk = zk;
this.monitorTimeoutMillis = zkSessionTimeoutMillis * 5 / 6;
this.tickTimeMillis = zkSessionTimeoutMillis / 15;
- this.shutdownService = shutdownService;
+ this.sessionExpiredHandler = sessionExpiredHandler;
+ this.sessionExpiredHandler.setWatcher(this);
}
public void start() {
@@ -100,9 +101,7 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
case None:
if (eventState == Watcher.Event.KeeperState.Expired) {
LOG.error("ZooKeeper session already expired, invoking shutdown");
- close();
- shuttingDown = true;
- shutdownService.shutdown(-1);
+ sessionExpiredHandler.onSessionExpired();
}
break;
default:
@@ -151,10 +150,8 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
keeperState = Watcher.Event.KeeperState.Disconnected;
}
if (keeperState == Watcher.Event.KeeperState.Expired) {
- LOG.error("zoo keeper session expired, invoking shutdown service");
- close();
- shuttingDown = true;
- shutdownService.shutdown(-1);
+ LOG.error("zookeeper session expired, invoking shutdown service");
+ sessionExpiredHandler.onSessionExpired();
} else if (keeperState == Watcher.Event.KeeperState.Disconnected) {
if (disconnectedAt == 0) {
// this is the first disconnect, we should monitor the time out from now, so we record the time of
@@ -166,9 +163,7 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt);
if (timeRemainingMillis <= 0) {
LOG.error("timeout expired for reconnecting, invoking shutdown service");
- close();
- shuttingDown = true;
- shutdownService.shutdown(-1);
+ sessionExpiredHandler.onSessionExpired();
} else {
LOG.warn("zoo keeper disconnected, waiting to reconnect, time remaining = {} seconds",
TimeUnit.MILLISECONDS.toSeconds(timeRemainingMillis));
@@ -189,5 +184,6 @@ public class ZooKeeperSessionWatcher implements Watcher, StatCallback, Runnable,
if (scheduler != null) {
scheduler.shutdownNow();
}
+ shuttingDown = true;
}
}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java
new file mode 100644
index 0000000..fc2f180
--- /dev/null
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.zookeeper;
+
+/**
+ * Handler interface on Zookeeper session expired
+ */
+public interface ZookeeperSessionExpiredHandler {
+
+ /**
+ * Signal when zookeeper session is expired.
+ */
+ void onSessionExpired();
+
+ void setWatcher(ZooKeeperSessionWatcher watcher);
+}
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
index 4c162f6..1dbb2be 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
@@ -35,7 +35,16 @@ public class LocalZooKeeperConnectionServiceTest {
MockedZooKeeperClientFactoryImpl mockZkClientFactory = new MockedZooKeeperClientFactoryImpl();
LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
mockZkClientFactory, "dummy", 1000);
- localZkConnectionService.start(null);
+ localZkConnectionService.start(new ZookeeperSessionExpiredHandler() {
+ @Override
+ public void onSessionExpired() {
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
+ }
+ });
// Get ZooKeeper client
MockZooKeeper zk = (MockZooKeeper) localZkConnectionService.getLocalZooKeeper();
@@ -91,7 +100,16 @@ public class LocalZooKeeperConnectionServiceTest {
LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
new ZookeeperClientFactoryImpl(), "dummy", 1000);
try {
- localZkConnectionService.start(null);
+ localZkConnectionService.start(new ZookeeperSessionExpiredHandler() {
+ @Override
+ public void onSessionExpired() {
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+
+ }
+ });
fail("should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Failed to establish session with local ZK"));
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
index 9c44103..5b49b2b 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
@@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.apache.zookeeper.KeeperException.Code;
-import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -55,7 +54,20 @@ public class ZooKeeperSessionWatcherTest {
void setup() {
zkClient = MockZooKeeper.newInstance();
shutdownService = new MockShutdownService();
- sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, shutdownService);
+ sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, new ZookeeperSessionExpiredHandler() {
+
+ private ZooKeeperSessionWatcher watcher;
+ @Override
+ public void onSessionExpired() {
+ watcher.close();
+ shutdownService.shutdown(-1);
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+ this.watcher = watcher;
+ }
+ });
}
@AfterMethod
@@ -113,8 +125,21 @@ public class ZooKeeperSessionWatcherTest {
}
@Test
- public void testRun1() throws Exception {
- ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000, shutdownService);
+ void testRun1() throws Exception {
+ ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000,
+ new ZookeeperSessionExpiredHandler() {
+ private ZooKeeperSessionWatcher watcher;
+ @Override
+ public void onSessionExpired() {
+ watcher.close();
+ shutdownService.shutdown(-1);
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+ this.watcher = watcher;
+ }
+ });
sessionWatcherZkNull.run();
assertFalse(sessionWatcherZkNull.isShutdownStarted());
assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected);
@@ -123,8 +148,22 @@ public class ZooKeeperSessionWatcherTest {
}
@Test
- public void testRun2() throws Exception {
- ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0, shutdownService);
+ void testRun2() throws Exception {
+ ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0,
+ new ZookeeperSessionExpiredHandler() {
+
+ private ZooKeeperSessionWatcher watcher;
+ @Override
+ public void onSessionExpired() {
+ watcher.close();
+ shutdownService.shutdown(-1);
+ }
+
+ @Override
+ public void setWatcher(ZooKeeperSessionWatcher watcher) {
+ this.watcher = watcher;
+ }
+ });
sessionWatcherZkNull.run();
assertTrue(sessionWatcherZkNull.isShutdownStarted());
assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected);