You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/01/26 22:11:21 UTC
bookkeeper git commit: BOOKKEEPER-855: handle session expire event in
bookie (sijie)
Repository: bookkeeper
Updated Branches:
refs/heads/master 19160e44d -> 92722ee9c
BOOKKEEPER-855: handle session expire event in bookie (sijie)
This change is to retry bookie registration when zookeeper session expired.
Author: Sijie Guo <si...@apache.org>
Reviewers: Ivan Kelly <iv...@apache.org>, Matteo Merli <mm...@apache.org>
Closes #1 from sijie/sijie/BOOKKEEPER-855
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/92722ee9
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/92722ee9
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/92722ee9
Branch: refs/heads/master
Commit: 92722ee9c34b069e23d1a87d7fc78256b8540268
Parents: 19160e4
Author: Sijie Guo <si...@apache.org>
Authored: Tue Jan 26 13:11:04 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Jan 26 13:11:04 2016 -0800
----------------------------------------------------------------------
.../org/apache/bookkeeper/bookie/Bookie.java | 138 +++++++++++++-----
.../bookkeeper/bookie/ReadOnlyBookie.java | 66 +--------
.../bookkeeper/conf/ServerConfiguration.java | 44 ++++++
.../bookie/BookieInitializationTest.java | 4 +-
.../bookkeeper/client/TestBookieWatcher.java | 140 +++++++++++++++++++
.../replication/AuditorLedgerCheckerTest.java | 2 +-
.../replication/TestReplicationWorker.java | 2 +-
.../bookkeeper/test/BookieZKExpireTest.java | 15 +-
.../bookkeeper/test/ReadOnlyBookieTest.java | 2 +-
.../zookeeper/TestZooKeeperClient.java | 8 +-
10 files changed, 312 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 07b3d30..74876ff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -34,14 +34,18 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -61,12 +65,10 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -119,6 +121,7 @@ public class Bookie extends BookieCriticalThread {
// ZK registration path for this bookie
protected final String bookieRegistrationPath;
+ protected final String bookieReadonlyRegistrationPath;
private final LedgerDirsManager ledgerDirsManager;
private LedgerDirsManager indexDirsManager;
@@ -142,7 +145,11 @@ public class Bookie extends BookieCriticalThread {
final protected String zkBookieRegPath;
final protected String zkBookieReadOnlyPath;
+ final private AtomicBoolean zkRegistered = new AtomicBoolean(false);
final protected AtomicBoolean readOnly = new AtomicBoolean(false);
+ // executor to manage the state changes for a bookie.
+ final ExecutorService stateService = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("BookieStateService-%d").build());
// Expose Stats
private final Counter writeBytes;
@@ -468,6 +475,8 @@ public class Bookie extends BookieCriticalThread {
throws IOException, KeeperException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
+ this.bookieReadonlyRegistrationPath =
+ this.bookieRegistrationPath + BookKeeperConstants.READONLY;
this.conf = conf;
this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
@@ -512,7 +521,7 @@ public class Bookie extends BookieCriticalThread {
// ZK ephemeral node for this Bookie.
String myID = getMyId();
zkBookieRegPath = this.bookieRegistrationPath + myID;
- zkBookieReadOnlyPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY + "/" + myID;
+ zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID;
// Expose Stats
writeBytes = statsLogger.getCounter(WRITE_BYTES);
@@ -522,7 +531,7 @@ public class Bookie extends BookieCriticalThread {
readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
readBytesStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY_BYTES);
- // 1 : up, 0 : readonly
+ // 1 : up, 0 : readonly, -1 : unregistered
statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
@@ -531,7 +540,7 @@ public class Bookie extends BookieCriticalThread {
@Override
public Number getSample() {
- return readOnly.get() ? 0 : 1;
+ return zkRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
}
});
}
@@ -541,6 +550,7 @@ public class Bookie extends BookieCriticalThread {
}
void readJournal() throws IOException, BookieException {
+ long startTs = MathUtils.now();
journal.replay(new JournalScanner() {
@Override
public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
@@ -590,6 +600,8 @@ public class Bookie extends BookieCriticalThread {
}
}
});
+ long elapsedTs = MathUtils.now() - startTs;
+ LOG.info("Finished replaying journal in {} ms.", elapsedTs);
}
@Override
@@ -632,9 +644,9 @@ public class Bookie extends BookieCriticalThread {
// if setting it in bookie thread, the watcher might run before bookie thread.
running = true;
try {
- registerBookie(conf);
- } catch (IOException e) {
- LOG.error("Couldn't register bookie with zookeeper, shutting down", e);
+ registerBookie(true).get();
+ } catch (Exception e) {
+ LOG.error("Couldn't register bookie with zookeeper, shutting down : ", e);
shutdown(ExitCode.ZK_REG_FAIL);
}
}
@@ -800,19 +812,46 @@ public class Bookie extends BookieCriticalThread {
/**
* Register as an available bookie
*/
- protected void registerBookie(ServerConfiguration conf) throws IOException {
+ protected Future<Void> registerBookie(final boolean throwException) {
+ return stateService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ try {
+ doRegisterBookie();
+ } catch (IOException ioe) {
+ if (throwException) {
+ throw ioe;
+ } else {
+ LOG.error("Couldn't register bookie with zookeeper, shutting down : ", ioe);
+ triggerBookieShutdown(ExitCode.ZK_REG_FAIL);
+ }
+ }
+ return (Void)null;
+ }
+ });
+ }
+
+ protected void doRegisterBookie() throws IOException {
+ doRegisterBookie(readOnly.get() ? zkBookieReadOnlyPath : zkBookieRegPath);
+ }
+
+ private void doRegisterBookie(final String regPath) throws IOException {
if (null == zk) {
// zookeeper instance is null, means not register itself to zk
return;
}
+ zkRegistered.set(false);
+
// ZK ephemeral node for this Bookie.
try{
- if (!checkRegNodeAndWaitExpired(zkBookieRegPath)) {
+ if (!checkRegNodeAndWaitExpired(regPath)) {
// Create the ZK ephemeral node for this Bookie.
- zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ zk.create(regPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
+ LOG.info("Registered myself in ZooKeeper at {}.", regPath);
}
+ zkRegistered.set(true);
} catch (KeeperException ke) {
LOG.error("ZK exception registering ephemeral Znode for Bookie!", ke);
// Throw an IOException back up. This will cause the Bookie
@@ -832,14 +871,31 @@ public class Bookie extends BookieCriticalThread {
/**
* Transition the bookie from readOnly mode to writable
*/
+ private Future<Void> transitionToWritableMode() {
+ return stateService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ doTransitionToWritableMode();
+ return null;
+ }
+ });
+ }
+
@VisibleForTesting
- public void transitionToWritableMode() {
+ public void doTransitionToWritableMode() {
+ if (shuttingdown) {
+ return;
+ }
if (!readOnly.compareAndSet(true, false)) {
return;
}
LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
+ // change zookeeper state only when using zookeeper
+ if (null == zk) {
+ return;
+ }
try {
- this.registerBookie(conf);
+ doRegisterBookie(zkBookieRegPath);
} catch (IOException e) {
LOG.warn("Error in transitioning back to writable mode : ", e);
transitionToReadOnlyMode();
@@ -863,12 +919,21 @@ public class Bookie extends BookieCriticalThread {
/**
* Transition the bookie to readOnly mode
*/
+ private Future<Void> transitionToReadOnlyMode() {
+ return stateService.submit(new Callable<Void>() {
+ @Override
+ public Void call() {
+ doTransitionToReadOnlyMode();
+ return (Void)null;
+ }
+ });
+ }
+
@VisibleForTesting
- public void transitionToReadOnlyMode() {
+ public void doTransitionToReadOnlyMode() {
if (shuttingdown) {
return;
}
-
if (!readOnly.compareAndSet(false, true)) {
return;
}
@@ -882,22 +947,20 @@ public class Bookie extends BookieCriticalThread {
}
LOG.info("Transitioning Bookie to ReadOnly mode,"
+ " and will serve only read requests from clients!");
+ // change zookeeper state only when using zookeeper
+ if (null == zk) {
+ return;
+ }
try {
- if (null == zk.exists(this.bookieRegistrationPath
- + BookKeeperConstants.READONLY, false)) {
+ if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
try {
- zk.create(this.bookieRegistrationPath
- + BookKeeperConstants.READONLY, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// this node is just now created by someone.
}
}
- if (!checkRegNodeAndWaitExpired(zkBookieReadOnlyPath)) {
- // Create the readonly node
- zk.create(zkBookieReadOnlyPath,
- new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- }
+ doRegisterBookie(zkBookieReadOnlyPath);
try {
// Clear the current registered node
zk.delete(zkBookieRegPath, -1);
@@ -948,16 +1011,22 @@ public class Bookie extends BookieCriticalThread {
*
* @return zk client instance
*/
- private ZooKeeper newZookeeper(ServerConfiguration conf) throws IOException, InterruptedException,
- KeeperException {
+ private ZooKeeper newZookeeper(final ServerConfiguration conf)
+ throws IOException, InterruptedException, KeeperException {
Set<Watcher> watchers = new HashSet<Watcher>();
watchers.add(new Watcher() {
@Override
public void process(WatchedEvent event) {
+ if (!running) {
+ // do nothing until first registration
+ return;
+ }
// Check for expired connection.
- if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
- LOG.error("ZK client connection to the ZK server has expired!");
- shutdown(ExitCode.ZK_EXPIRED);
+ if (event.getType().equals(EventType.None) &&
+ event.getState().equals(KeeperState.Expired)) {
+ zkRegistered.set(false);
+ // schedule a re-register operation
+ registerBookie(false);
}
}
});
@@ -965,8 +1034,8 @@ public class Bookie extends BookieCriticalThread {
.connectString(conf.getZkServers())
.sessionTimeoutMs(conf.getZkTimeout())
.watchers(watchers)
- .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
- conf.getZkTimeout(), Integer.MAX_VALUE))
+ .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
+ conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
.build();
}
@@ -982,7 +1051,9 @@ public class Bookie extends BookieCriticalThread {
journal.start();
// wait until journal quits
journal.join();
+ LOG.info("Journal thread quits.");
} catch (InterruptedException ie) {
+ LOG.warn("Interrupted on running journal thread : ", ie);
}
// if the journal thread quits due to shutting down, it is ok
if (!shuttingdown) {
@@ -1033,6 +1104,9 @@ public class Bookie extends BookieCriticalThread {
// mark bookie as in shutting down progress
shuttingdown = true;
+ // Shutdown the state service
+ stateService.shutdown();
+
// Shutdown journal
journal.shutdown();
this.join();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index d354fb3..8b42029 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -25,16 +25,10 @@ import java.io.IOException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* Implements a read only bookie.
*
@@ -55,66 +49,18 @@ public class ReadOnlyBookie extends Bookie {
LOG.error(err);
throw new IOException(err);
}
- LOG.info("successed call ReadOnlyBookie constructor");
- }
-
- /**
- * Register as a read only bookie
- */
- @Override
- protected void registerBookie(ServerConfiguration conf) throws IOException {
- if (null == zk) {
- // zookeeper instance is null, means not register itself to zk
- return;
- }
-
- // ZK node for this ReadOnly Bookie.
- try{
- if (null == zk.exists(this.bookieRegistrationPath
- + BookKeeperConstants.READONLY, false)) {
- try {
- zk.create(this.bookieRegistrationPath
- + BookKeeperConstants.READONLY + "/", new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- LOG.debug("successed create ReadOnlyBookie parent zk node");
- } catch (NodeExistsException e) {
- // this node is just now created by someone.
- }
- }
-
- if (!checkRegNodeAndWaitExpired(zkBookieReadOnlyPath)) {
- // Create the ZK node for this RO Bookie.
- zk.create(zkBookieReadOnlyPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL);
- LOG.debug("successed create ReadOnlyBookie zk node");
- }
- } catch (KeeperException ke) {
- LOG.error("ZK exception registering Znode for ReadOnly Bookie!", ke);
- // Throw an IOException back up. This will cause the Bookie
- // constructor to error out. Alternatively, we could do a System
- // exit here as this is a fatal error.
- throw new IOException(ke);
- } catch (InterruptedException ie) {
- LOG.error("Interruptted exception registering Znode for ReadOnly Bookie!",
- ie);
- // Throw an IOException back up. This will cause the Bookie
- // constructor to error out. Alternatively, we could do a System
- // exit here as this is a fatal error.
- throw new IOException(ie);
- }
+ LOG.info("Running bookie in readonly mode.");
}
- @VisibleForTesting
@Override
- public void transitionToWritableMode() {
+ public void doTransitionToWritableMode() {
+ // no-op
LOG.info("Skip transition to writable mode for readonly bookie");
}
-
- @VisibleForTesting
@Override
- public void transitionToReadOnlyMode() {
- LOG.warn("Skip transition to readonly mode for readonly bookie");
+ public void doTransitionToReadOnlyMode() {
+ // no-op
+ LOG.info("Skip transition to readonly mode for readonly bookie");
}
-
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index cc2dda6..d4305ca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -84,6 +84,8 @@ public class ServerConfiguration extends AbstractConfiguration {
// Zookeeper Parameters
protected final static String ZK_TIMEOUT = "zkTimeout";
protected final static String ZK_SERVERS = "zkServers";
+ protected final static String ZK_RETRY_BACKOFF_START_MS = "zkRetryBackoffStartMs";
+ protected final static String ZK_RETRY_BACKOFF_MAX_MS = "zkRetryBackoffMaxMs";
protected final static String OPEN_LEDGER_REREPLICATION_GRACE_PERIOD = "openLedgerRereplicationGracePeriod";
//ReadOnly mode support on all disk full
protected final static String READ_ONLY_MODE_ENABLED = "readOnlyModeEnabled";
@@ -658,6 +660,48 @@ public class ServerConfiguration extends AbstractConfiguration {
}
/**
+ * Get zookeeper client backoff retry start time in millis.
+ *
+ * @return zk backoff retry start time in millis.
+ */
+ public int getZkRetryBackoffStartMs() {
+ return getInt(ZK_RETRY_BACKOFF_START_MS, getZkTimeout());
+ }
+
+ /**
+ * Set zookeeper client backoff retry start time in millis.
+ *
+ * @param retryMs
+ * backoff retry start time in millis.
+ * @return server configuration.
+ */
+ public ServerConfiguration setZkRetryBackoffStartMs(int retryMs) {
+ setProperty(ZK_RETRY_BACKOFF_START_MS, retryMs);
+ return this;
+ }
+
+ /**
+ * Get zookeeper client backoff retry max time in millis.
+ *
+ * @return zk backoff retry max time in millis.
+ */
+ public int getZkRetryBackoffMaxMs() {
+ return getInt(ZK_RETRY_BACKOFF_MAX_MS, getZkTimeout());
+ }
+
+ /**
+ * Set zookeeper client backoff retry max time in millis.
+ *
+ * @param retryMs
+ * backoff retry max time in millis.
+ * @return server configuration.
+ */
+ public ServerConfiguration setZkRetryBackoffMaxMs(int retryMs) {
+ setProperty(ZK_RETRY_BACKOFF_MAX_MS, retryMs);
+ return this;
+ }
+
+ /**
* Is statistics enabled
*
* @return is statistics enabled
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 5db8aad..39dfac6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -51,7 +51,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
.getLogger(BookieInitializationTest.class);
ZooKeeper newzk = null;
-
+
public BookieInitializationTest() {
super(0);
}
@@ -71,7 +71,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
}
void testRegisterBookie(ServerConfiguration conf) throws IOException {
- super.registerBookie(conf);
+ super.doRegisterBookie();
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
new file mode 100644
index 0000000..9558ddc
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBookieWatcher extends BookKeeperClusterTestCase {
+
+ public TestBookieWatcher() {
+ super(2);
+ }
+
+ private void expireZooKeeperSession(ZooKeeper zk, int timeout)
+ throws IOException, InterruptedException, KeeperException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ZooKeeper newZk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout,
+ new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == EventType.None &&
+ event.getState() == KeeperState.SyncConnected) {
+ latch.countDown();
+ }
+ }
+
+ }, zk.getSessionId(), zk.getSessionPasswd());
+ if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
+ throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+ }
+ newZk.close();
+ }
+
+ @Test(timeout=10000)
+ public void testBookieWatcherSurviveWhenSessionExpired() throws Exception {
+ final int timeout = 2000;
+ ZooKeeper zk = ZooKeeperClient.newBuilder()
+ .connectString(zkUtil.getZooKeeperConnectString())
+ .sessionTimeoutMs(timeout)
+ .build();
+ try {
+ runBookieWatcherWhenSessionExpired(zk, timeout, true);
+ } finally {
+ zk.close();
+ }
+ }
+
+ @Test(timeout=10000)
+ public void testBookieWatcherDieWhenSessionExpired() throws Exception {
+ final int timeout = 2000;
+ final CountDownLatch connectLatch = new CountDownLatch(1);
+ ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout, new Watcher() {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ if (EventType.None == watchedEvent.getType() &&
+ KeeperState.SyncConnected == watchedEvent.getState()) {
+ connectLatch.countDown();
+ }
+ }
+ });
+ connectLatch.await();
+ try {
+ runBookieWatcherWhenSessionExpired(zk, timeout, false);
+ } finally {
+ zk.close();
+ }
+ }
+
+ private void runBookieWatcherWhenSessionExpired(ZooKeeper zk, int timeout, boolean reconnectable)
+ throws Exception {
+ ClientConfiguration conf = new ClientConfiguration();
+ BookKeeper bkc = new BookKeeper(conf, zk);
+
+ LedgerHandle lh;
+ try {
+ lh = bkc.createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {});
+ fail("Should fail to create ledger due to not enough bookies.");
+ } catch (BKException bke) {
+ // expected
+ }
+
+ // make zookeeper session expired
+ expireZooKeeperSession(bkc.zk, timeout);
+ TimeUnit.MILLISECONDS.sleep(3 * timeout);
+
+ // start four new bookies
+ for (int i=0; i<2; i++) {
+ startNewBookie();
+ }
+
+ // wait for bookie watcher backoff time.
+ TimeUnit.SECONDS.sleep(1);
+
+ // should success to detect newly added bookies
+ try {
+ lh = bkc.createLedger(3, 2, 2, BookKeeper.DigestType.CRC32, new byte[] {});
+ lh.close();
+ if (!reconnectable) {
+ fail("Should fail to create ledger due to bookie watcher could not survive after session expire.");
+ }
+ } catch (BKException bke) {
+ if (reconnectable) {
+ fail("Should not fail to create ledger due to bookie watcher could survive after session expire.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index b1a53e5..692ddce 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -272,7 +272,7 @@ public class AuditorLedgerCheckerTest extends MultiLedgerManagerTestCase {
ServerConfiguration bookieConf = bsConfs.get(2);
BookieServer bk = bs.get(2);
bookieConf.setReadOnlyModeEnabled(true);
- bk.getBookie().transitionToReadOnlyMode();
+ bk.getBookie().doTransitionToReadOnlyMode();
// grace period for publishing the bk-ledger
LOG.debug("Waiting for Auditor to finish ledger check.");
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 0490deb..9591ef8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -530,7 +530,7 @@ public class TestReplicationWorker extends MultiLedgerManagerTestCase {
try {
BookieServer newBk = bs.get(bs.size() - 1);
bsConfs.get(bsConfs.size() - 1).setReadOnlyModeEnabled(true);
- newBk.getBookie().transitionToReadOnlyMode();
+ newBk.getBookie().doTransitionToReadOnlyMode();
underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath) && rw.isRunning()) {
Thread.sleep(100);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
index a3bd4d6..573bc15 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java
@@ -28,6 +28,8 @@ import org.junit.After;
import static org.junit.Assert.*;
import org.apache.bookkeeper.conf.ServerConfiguration;
+
+import java.net.InetAddress;
import java.util.HashSet;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.bookie.Bookie;
@@ -89,16 +91,9 @@ public class BookieZKExpireTest extends BookKeeperClusterTestCase {
sendthread.resume();
// allow watcher thread to run
- secondsToWait = 20;
- while (server.isBookieRunning()
- || server.isRunning()) {
- Thread.sleep(1000);
- if (secondsToWait-- <= 0) {
- break;
- }
- }
- assertFalse("Bookie should have shutdown on losing zk session", server.isBookieRunning());
- assertFalse("Bookie Server should have shutdown on losing zk session", server.isRunning());
+ Thread.sleep(3000);
+ assertTrue("Bookie should not shutdown on losing zk session", server.isBookieRunning());
+ assertTrue("Bookie Server should not shutdown on losing zk session", server.isRunning());
} finally {
server.shutdown();
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 771a8a1..124a420 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -248,7 +248,7 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
killBookie(1);
baseConf.setReadOnlyModeEnabled(true);
startNewBookie();
- bs.get(1).getBookie().transitionToReadOnlyMode();
+ bs.get(1).getBookie().doTransitionToReadOnlyMode();
try {
bkc.readBookiesBlocking();
bkc.createLedger(2, 2, DigestType.CRC32, "".getBytes());
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/92722ee9/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
index 0c23aaf..d829db5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
@@ -106,8 +106,12 @@ public class TestZooKeeperClient extends TestCase {
class ShutdownZkServerClient extends ZooKeeperClient {
ShutdownZkServerClient(String connectString, int sessionTimeoutMs,
- ZooKeeperWatcherBase watcher, RetryPolicy operationRetryPolicy)throws IOException {
- super(connectString, sessionTimeoutMs, watcher, operationRetryPolicy, null, NullStatsLogger.INSTANCE, 1, 0);
+ ZooKeeperWatcherBase watcher, RetryPolicy operationRetryPolicy)
+ throws IOException {
+ super(connectString, sessionTimeoutMs, watcher,
+ new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
+ operationRetryPolicy,
+ NullStatsLogger.INSTANCE, 1, 0);
}
@Override