You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/10/27 18:58:26 UTC
[bookkeeper] branch master updated: ISSUE #662: Introduce Bookie
Registration Manager for bookie server
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e9b8570 ISSUE #662: Introduce Bookie Registration Manager for bookie server
e9b8570 is described below
commit e9b857092703719c9ea38a3a74107fe5114af94a
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri Oct 27 11:58:19 2017 -0700
ISSUE #662: Introduce Bookie Registration Manager for bookie server
Descriptions of the changes in this PR:
While in BOOKKEEPER-628, It is trying to improve/generalize the bookie registration process.
This PR is for bookie side, it tries to introduce Bookie registration manager for bookie server.
Author: Jia Zhai <zh...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #663 from zhaijack/bookie_registration, closes #662
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 354 +++++--------------
.../apache/bookkeeper/bookie/BookieException.java | 86 ++++-
.../org/apache/bookkeeper/bookie/BookieShell.java | 102 +++---
.../java/org/apache/bookkeeper/bookie/Cookie.java | 177 ++++------
.../bookkeeper/bookie/FileSystemUpgrade.java | 53 ++-
.../bookkeeper/conf/ServerConfiguration.java | 28 ++
.../bookkeeper/discover/RegistrationClient.java | 36 ++
.../bookkeeper/discover/RegistrationManager.java | 108 ++++++
.../bookkeeper/discover/ZKRegistrationManager.java | 391 +++++++++++++++++++++
.../apache/bookkeeper/discover/package-info.java | 22 ++
.../bookkeeper/http/ExpandStorageService.java | 9 +-
.../bookkeeper/http/RecoveryBookieService.java | 12 +-
.../bookie/BookieInitializationTest.java | 79 ++++-
.../org/apache/bookkeeper/bookie/CookieTest.java | 36 +-
.../bookkeeper/bookie/UpdateCookieCmdTest.java | 39 +-
15 files changed, 1040 insertions(+), 492 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 220aa4c..953fb17 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -21,13 +21,11 @@
package org.apache.bookkeeper.bookie;
-import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
@@ -54,25 +52,28 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
+import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -87,24 +88,15 @@ import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,18 +121,14 @@ public class Bookie extends BookieCriticalThread {
static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
static final long METAENTRY_ID_FENCE_KEY = -0x2000;
- // ZK registration path for this bookie
- protected final String bookieRegistrationPath;
- protected final String bookieReadonlyRegistrationPath;
-
private final LedgerDirsManager ledgerDirsManager;
private LedgerDirsManager indexDirsManager;
LedgerDirsMonitor ledgerMonitor;
LedgerDirsMonitor idxMonitor;
- // ZooKeeper client instance for the Bookie
- ZooKeeper zk;
+ // Registration Manager for managing registration
+ RegistrationManager registrationManager;
// Running flag
private volatile boolean running = false;
@@ -151,11 +139,9 @@ public class Bookie extends BookieCriticalThread {
private final ConcurrentLongHashMap<byte[]> masterKeyCache = new ConcurrentLongHashMap<>();
- protected final String zkBookieRegPath;
- protected final String zkBookieReadOnlyPath;
- protected final List<ACL> zkAcls;
+ protected final String bookieId;
- private final AtomicBoolean zkRegistered = new AtomicBoolean(false);
+ private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
protected final AtomicBoolean readOnly = new AtomicBoolean(false);
// executor to manage the state changes for a bookie.
final ExecutorService stateService = Executors.newSingleThreadExecutor(
@@ -254,19 +240,29 @@ public class Bookie extends BookieCriticalThread {
}
}
+ @VisibleForTesting
+ public void setRegistrationManager(RegistrationManager rm) {
+ this.registrationManager = rm;
+ }
+
+ @VisibleForTesting
+ public RegistrationManager getRegistrationManager() {
+ return this.registrationManager;
+ }
+
/**
* Check that the environment for the bookie is correct.
* This means that the configuration has stayed the same as the
* first run and the filesystem structure is up to date.
*/
- private void checkEnvironment(ZooKeeper zk) throws BookieException, IOException {
+ private void checkEnvironment(RegistrationManager rm) throws BookieException, IOException {
List<File> allLedgerDirs = new ArrayList<File>(ledgerDirsManager.getAllLedgerDirs().size()
+ indexDirsManager.getAllLedgerDirs().size());
allLedgerDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
if (indexDirsManager != ledgerDirsManager) {
allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs());
}
- if (zk == null) { // exists only for testing, just make sure directories are correct
+ if (rm == null) { // exists only for testing, just make sure directories are correct
for (File journalDirectory : journalDirectories) {
checkDirectoryStructure(journalDirectory);
@@ -279,7 +275,7 @@ public class Bookie extends BookieCriticalThread {
}
if (conf.getAllowStorageExpansion()) {
- checkEnvironmentWithStorageExpansion(conf, zk, journalDirectories, allLedgerDirs);
+ checkEnvironmentWithStorageExpansion(conf, rm, journalDirectories, allLedgerDirs);
return;
}
@@ -303,20 +299,20 @@ public class Bookie extends BookieCriticalThread {
}
}
- String instanceId = getInstanceId(conf, zk);
+ String instanceId = rm.getClusterInstanceId();
Cookie.Builder builder = Cookie.generateCookie(conf);
if (null != instanceId) {
builder.setInstanceId(instanceId);
}
Cookie masterCookie = builder.build();
- Versioned<Cookie> zkCookie = null;
+ Versioned<Cookie> rmCookie = null;
try {
- zkCookie = Cookie.readFromZooKeeper(zk, conf);
+ rmCookie = Cookie.readFromRegistrationManager(rm, conf);
// If allowStorageExpansion option is set, we should
// make sure that the new set of ledger/index dirs
// is a super set of the old; else, we fail the cookie check
- masterCookie.verifyIsSuperSet(zkCookie.getValue());
- } catch (KeeperException.NoNodeException nne) {
+ masterCookie.verifyIsSuperSet(rmCookie.getValue());
+ } catch (CookieNotFoundException e) {
// can occur in cases:
// 1) new environment or
// 2) done only metadata format and started bookie server.
@@ -381,7 +377,7 @@ public class Bookie extends BookieCriticalThread {
for (File dir : allLedgerDirs) {
masterCookie.writeToDirectory(dir);
}
- masterCookie.writeToZooKeeper(zk, conf, zkCookie != null ? zkCookie.getVersion() : Version.NEW);
+ masterCookie.writeToRegistrationManager(rm, conf, rmCookie != null ? rmCookie.getVersion() : Version.NEW);
}
List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
@@ -389,18 +385,10 @@ public class Bookie extends BookieCriticalThread {
List<File> indexDirs = indexDirsManager.getAllLedgerDirs();
checkIfDirsOnSameDiskPartition(indexDirs);
checkIfDirsOnSameDiskPartition(journalDirectories);
- } catch (KeeperException ke) {
- LOG.error("Couldn't access cookie in zookeeper", ke);
- throw new BookieException.InvalidCookieException(ke);
- } catch (UnknownHostException uhe) {
- LOG.error("Couldn't check cookies, networking is broken", uhe);
- throw new BookieException.InvalidCookieException(uhe);
+
} catch (IOException ioe) {
LOG.error("Error accessing cookie on disks", ioe);
throw new BookieException.InvalidCookieException(ioe);
- } catch (InterruptedException ie) {
- LOG.error("Thread interrupted while checking cookies, exiting", ie);
- throw new BookieException.InvalidCookieException(ie);
}
}
@@ -450,8 +438,11 @@ public class Bookie extends BookieCriticalThread {
}
}
- public static void checkEnvironmentWithStorageExpansion(ServerConfiguration conf,
- ZooKeeper zk, List<File> journalDirectories, List<File> allLedgerDirs) throws BookieException, IOException {
+ public static void checkEnvironmentWithStorageExpansion(
+ ServerConfiguration conf,
+ RegistrationManager rm,
+ List<File> journalDirectories,
+ List<File> allLedgerDirs) throws BookieException {
try {
boolean newEnv = false;
List<File> missedCookieDirs = new ArrayList<File>();
@@ -472,20 +463,20 @@ public class Bookie extends BookieCriticalThread {
}
}
- String instanceId = getInstanceId(conf, zk);
+ String instanceId = rm.getClusterInstanceId();
Cookie.Builder builder = Cookie.generateCookie(conf);
if (null != instanceId) {
builder.setInstanceId(instanceId);
}
Cookie masterCookie = builder.build();
- Versioned<Cookie> zkCookie = null;
+ Versioned<Cookie> rmCookie = null;
try {
- zkCookie = Cookie.readFromZooKeeper(zk, conf);
+ rmCookie = Cookie.readFromRegistrationManager(rm, conf);
// If allowStorageExpansion option is set, we should
// make sure that the new set of ledger/index dirs
// is a super set of the old; else, we fail the cookie check
- masterCookie.verifyIsSuperSet(zkCookie.getValue());
- } catch (KeeperException.NoNodeException nne) {
+ masterCookie.verifyIsSuperSet(rmCookie.getValue());
+ } catch (CookieNotFoundException e) {
// can occur in cases:
// 1) new environment or
// 2) done only metadata format and started bookie server.
@@ -550,20 +541,11 @@ public class Bookie extends BookieCriticalThread {
for (File dir : allLedgerDirs) {
masterCookie.writeToDirectory(dir);
}
- masterCookie.writeToZooKeeper(zk, conf, zkCookie != null ? zkCookie.getVersion() : Version.NEW);
+ masterCookie.writeToRegistrationManager(rm, conf, rmCookie != null ? rmCookie.getVersion() : Version.NEW);
}
- } catch (KeeperException ke) {
- LOG.error("Couldn't access cookie in zookeeper", ke);
- throw new BookieException.InvalidCookieException(ke);
- } catch (UnknownHostException uhe) {
- LOG.error("Couldn't check cookies, networking is broken", uhe);
- throw new BookieException.InvalidCookieException(uhe);
} catch (IOException ioe) {
LOG.error("Error accessing cookie on disks", ioe);
throw new BookieException.InvalidCookieException(ioe);
- } catch (InterruptedException ie) {
- LOG.error("Thread interrupted while checking cookies, exiting", ie);
- throw new BookieException.InvalidCookieException(ie);
}
}
@@ -605,25 +587,6 @@ public class Bookie extends BookieCriticalThread {
return addr;
}
- private static String getInstanceId(ServerConfiguration conf, ZooKeeper zk) throws KeeperException,
- InterruptedException {
- String instanceId = null;
- if (zk.exists(conf.getZkLedgersRootPath(), null) == null) {
- LOG.error("BookKeeper metadata doesn't exist in zookeeper. "
- + "Has the cluster been initialized? "
- + "Try running bin/bookkeeper shell metaformat");
- throw new KeeperException.NoNodeException("BookKeeper metadata");
- }
- try {
- byte[] data = zk.getData(conf.getZkLedgersRootPath() + "/"
- + BookKeeperConstants.INSTANCEID, false, null);
- instanceId = new String(data, UTF_8);
- } catch (KeeperException.NoNodeException e) {
- LOG.info("INSTANCEID not exists in zookeeper. Not considering it for data verification");
- }
- return instanceId;
- }
-
public LedgerDirsManager getLedgerDirsManager() {
return ledgerDirsManager;
}
@@ -653,18 +616,14 @@ public class Bookie extends BookieCriticalThread {
}
public Bookie(ServerConfiguration conf)
- throws IOException, KeeperException, InterruptedException, BookieException {
+ throws IOException, InterruptedException, BookieException {
this(conf, NullStatsLogger.INSTANCE);
}
public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
- throws IOException, KeeperException, InterruptedException, BookieException {
+ throws IOException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
this.statsLogger = statsLogger;
- this.zkAcls = ZkUtils.getACLs(conf);
- this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
- this.bookieReadonlyRegistrationPath =
- this.bookieRegistrationPath + BookKeeperConstants.READONLY;
this.conf = conf;
this.journalDirectories = Lists.newArrayList();
for (File journalDirectory : conf.getJournalDirs()) {
@@ -683,9 +642,20 @@ public class Bookie extends BookieCriticalThread {
}
// instantiate zookeeper client to initialize ledger manager
- this.zk = instantiateZookeeperClient(conf);
- checkEnvironment(this.zk);
- ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+ this.registrationManager = instantiateRegistrationManager(conf);
+ checkEnvironment(this.registrationManager);
+ try {
+ ZooKeeper zooKeeper = null; // ZooKeeper is null existing only for testing
+ if (registrationManager != null) {
+ zooKeeper = ((ZKRegistrationManager) this.registrationManager).getZk();
+ }
+ // current the registration manager is zookeeper only
+ ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(
+ conf,
+ zooKeeper);
+ } catch (KeeperException e) {
+ throw new MetadataStoreException("Failed to initialize ledger manager", e);
+ }
LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
ledgerManager = ledgerManagerFactory.newLedgerManager();
@@ -721,9 +691,7 @@ public class Bookie extends BookieCriticalThread {
}
// ZK ephemeral node for this Bookie.
- String myID = getMyId();
- zkBookieRegPath = this.bookieRegistrationPath + myID;
- zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID;
+ this.bookieId = getMyId();
// instantiate the journals
journals = Lists.newArrayList();
@@ -760,7 +728,7 @@ public class Bookie extends BookieCriticalThread {
@Override
public Number getSample() {
- return zkRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
+ return rmRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
}
});
}
@@ -932,65 +900,23 @@ public class Bookie extends BookieCriticalThread {
}
/**
- * Instantiate the ZooKeeper client for the Bookie.
+ * Instantiate the registration manager for the Bookie.
*/
- private ZooKeeper instantiateZookeeperClient(ServerConfiguration conf)
- throws IOException, InterruptedException, KeeperException {
- if (conf.getZkServers() == null) {
- LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
- return null;
- }
- // Create the ZooKeeper client instance
- return newZookeeper(conf);
- }
-
- /**
- * Check existence of <i>regPath</i> and wait it expired if possible.
- *
- * @param regPath reg node path.
- * @return true if regPath exists, otherwise return false
- * @throws IOException if can't create reg path
- */
- protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
- final CountDownLatch prevNodeLatch = new CountDownLatch(1);
- Watcher zkPrevRegNodewatcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // Check for prev znode deletion. Connection expiration is
- // not handling, since bookie has logic to shutdown.
- if (EventType.NodeDeleted == event.getType()) {
- prevNodeLatch.countDown();
- }
- }
- };
+ private RegistrationManager instantiateRegistrationManager(ServerConfiguration conf) throws BookieException {
+ // Create the registration manager instance
+ Class<? extends RegistrationManager> managerCls;
try {
- Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
- if (null != stat) {
- // if the ephemeral owner isn't current zookeeper client
- // wait for it to be expired.
- if (stat.getEphemeralOwner() != zk.getSessionId()) {
- LOG.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
- + " {} ms for znode deletion", regPath, conf.getZkTimeout());
- // waiting for the previous bookie reg znode deletion
- if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
- throw new NodeExistsException(regPath);
- } else {
- return false;
- }
- }
- return true;
- } else {
- return false;
- }
- } catch (KeeperException ke) {
- LOG.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
- throw new IOException("ZK exception checking and wait ephemeral znode "
- + regPath + " expired", ke);
- } catch (InterruptedException ie) {
- LOG.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
- throw new IOException("Interrupted checking and wait ephemeral znode "
- + regPath + " expired", ie);
+ managerCls = conf.getRegistrationManagerClass();
+ } catch (ConfigurationException e) {
+ throw new BookieIllegalOpException(e);
}
+
+ RegistrationManager manager = ReflectionUtils.newInstance(managerCls);
+ return manager.initialize(conf, () -> {
+ rmRegistered.set(false);
+ // schedule a re-register operation
+ registerBookie(false);
+ }, statsLogger);
}
/**
@@ -1016,40 +942,28 @@ public class Bookie extends BookieCriticalThread {
}
protected void doRegisterBookie() throws IOException {
- doRegisterBookie(readOnly.get() ? zkBookieReadOnlyPath : zkBookieRegPath);
+ doRegisterBookie(readOnly.get());
}
- private void doRegisterBookie(final String regPath) throws IOException {
- if (null == zk) {
- // zookeeper instance is null, means not register itself to zk
+ private void doRegisterBookie(boolean isReadOnly) throws IOException {
+ if (null == registrationManager ||
+ ((ZKRegistrationManager) this.registrationManager).getZk() == null) {
+ // registration manager is null, means not register itself to zk.
+ // ZooKeeper is null existing only for testing.
+ LOG.info("null zk while do register");
return;
}
- zkRegistered.set(false);
-
- // ZK ephemeral node for this Bookie.
+ rmRegistered.set(false);
try {
- if (!checkRegNodeAndWaitExpired(regPath)) {
- // Create the ZK ephemeral node for this Bookie.
- zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
- LOG.info("Registered myself in ZooKeeper at {}.", regPath);
- }
- zkRegistered.set(true);
- } catch (KeeperException ke) {
- LOG.error("ZK exception registering ephemeral Znode for Bookie!", ke);
- // Throw an IOException back up. This will cause the Bookie
- // constructor to error out. Alternatively, we could do a System
- // exit here as this is a fatal error.
- throw new IOException(ke);
- } catch (InterruptedException ie) {
- LOG.error("Interrupted exception registering ephemeral Znode for Bookie!", ie);
- // Throw an IOException back up. This will cause the Bookie
- // constructor to error out. Alternatively, we could do a System
- // exit here as this is a fatal error.
- throw new IOException(ie);
+ registrationManager.registerBookie(bookieId, isReadOnly);
+ rmRegistered.set(true);
+ } catch (BookieException e) {
+ throw new IOException(e);
}
}
+
/**
* Transition the bookie from readOnly mode to writable.
*/
@@ -1073,11 +987,11 @@ public class Bookie extends BookieCriticalThread {
}
LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
// change zookeeper state only when using zookeeper
- if (null == zk) {
+ if (null == registrationManager) {
return;
}
try {
- doRegisterBookie(zkBookieRegPath);
+ doRegisterBookie(false);
} catch (IOException e) {
LOG.warn("Error in transitioning back to writable mode : ", e);
transitionToReadOnlyMode();
@@ -1085,12 +999,8 @@ public class Bookie extends BookieCriticalThread {
}
// clear the readonly state
try {
- zk.delete(zkBookieReadOnlyPath, -1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted clearing readonly state while transitioning to writable mode : ", e);
- return;
- } catch (KeeperException e) {
+ registrationManager.unregisterBookie(bookieId, true);
+ } catch (BookieException e) {
// if we failed when deleting the readonly flag in zookeeper, it is OK since client would
// already see the bookie in writable list. so just log the exception
LOG.warn("Failed to delete bookie readonly state in zookeeper : ", e);
@@ -1130,40 +1040,16 @@ public class Bookie extends BookieCriticalThread {
LOG.info("Transitioning Bookie to ReadOnly mode,"
+ " and will serve only read requests from clients!");
// change zookeeper state only when using zookeeper
- if (null == zk) {
+ if (null == registrationManager) {
return;
}
try {
- if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
- try {
- zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
- zkAcls, CreateMode.PERSISTENT);
- } catch (NodeExistsException e) {
- // this node is just now created by someone.
- }
- }
- doRegisterBookie(zkBookieReadOnlyPath);
- try {
- // Clear the current registered node
- zk.delete(zkBookieRegPath, -1);
- } catch (KeeperException.NoNodeException nne) {
- LOG.warn("No writable bookie registered node {} when transitioning to readonly",
- zkBookieRegPath, nne);
- }
- } catch (IOException e) {
- LOG.error("Error in transition to ReadOnly Mode."
- + " Shutting down", e);
- triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
- return;
- } catch (KeeperException e) {
+ registrationManager.registerBookie(bookieId, true);
+ } catch (BookieException e) {
LOG.error("Error in transition to ReadOnly Mode."
+ " Shutting down", e);
triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
return;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("Interrupted Exception while transitioning to ReadOnly Mode.");
- return;
}
}
@@ -1174,54 +1060,6 @@ public class Bookie extends BookieCriticalThread {
return readOnly.get();
}
- /**
- * Create a new zookeeper client to zk cluster.
- *
- * <p>
- * Bookie Server just used zk client when syncing ledgers for garbage collection.
- * So when zk client is expired, it means this bookie server is not available in
- * bookie server list. The bookie client will be notified for its expiration. No
- * more bookie request will be sent to this server. So it's better to exit when zk
- * expired.
- * </p>
- * <p>
- * Since there are lots of bk operations cached in queue, so we wait for all the operations
- * are processed and quit. It is done by calling <b>shutdown</b>.
- * </p>
- *
- * @param conf server configuration
- *
- * @return zk client instance
- */
- private ZooKeeper newZookeeper(final ServerConfiguration conf)
- throws IOException, InterruptedException, KeeperException {
- Set<Watcher> watchers = new HashSet<Watcher>();
- watchers.add(new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (!running) {
- // do nothing until first registration
- return;
- }
- // Check for expired connection.
- if (event.getType().equals(EventType.None) && event.getState().equals(KeeperState.Expired)) {
- zkRegistered.set(false);
- // schedule a re-register operation
- registerBookie(false);
- }
- }
- });
- return ZooKeeperClient.newBuilder()
- .connectString(conf.getZkServers())
- .sessionTimeoutMs(conf.getZkTimeout())
- .watchers(watchers)
- .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
- conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
- .requestRateLimit(conf.getZkRequestRateLimit())
- .statsLogger(this.statsLogger.scope(BOOKIE_SCOPE))
- .build();
- }
-
public boolean isRunning() {
return running;
}
@@ -1327,8 +1165,8 @@ public class Bookie extends BookieCriticalThread {
stateService.shutdown();
}
// Shutdown the ZK client
- if (zk != null) {
- zk.close();
+ if (registrationManager != null) {
+ registrationManager.close();
}
} catch (InterruptedException ie) {
LOG.error("Interrupted during shutting down bookie : ", ie);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 57bf708..99ae39e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -40,6 +40,10 @@ public abstract class BookieException extends Exception {
super(reason);
}
+ public BookieException(int code, String reason, Throwable t) {
+ super(reason, t);
+ }
+
public static BookieException create(int code) {
switch(code) {
case Code.UnauthorizedAccessException:
@@ -52,6 +56,12 @@ public abstract class BookieException extends Exception {
return new UpgradeException();
case Code.DiskPartitionDuplicationException:
return new DiskPartitionDuplicationException();
+ case Code.CookieNotFoundException:
+ return new CookieNotFoundException();
+ case Code.MetadataStoreException:
+ return new MetadataStoreException();
+ case Code.UnknownBookieIdException:
+ return new UnknownBookieIdException();
default:
return new BookieIllegalOpException();
}
@@ -66,10 +76,12 @@ public abstract class BookieException extends Exception {
int IllegalOpException = -100;
int LedgerFencedException = -101;
-
int InvalidCookieException = -102;
int UpgradeException = -103;
int DiskPartitionDuplicationException = -104;
+ int CookieNotFoundException = -105;
+ int MetadataStoreException = -106;
+ int UnknownBookieIdException = -107;
}
public void setCode(int code) {
@@ -101,6 +113,15 @@ public abstract class BookieException extends Exception {
case Code.DiskPartitionDuplicationException:
err = "Disk Partition Duplication is not allowed";
break;
+ case Code.CookieNotFoundException:
+ err = "Cookie not found";
+ break;
+ case Code.MetadataStoreException:
+ err = "Error performing metadata operations";
+ break;
+ case Code.UnknownBookieIdException:
+ err = "Unknown bookie id";
+ break;
default:
err = "Invalid operation";
break;
@@ -132,7 +153,15 @@ public abstract class BookieException extends Exception {
*/
public static class BookieIllegalOpException extends BookieException {
public BookieIllegalOpException() {
- super(Code.UnauthorizedAccessException);
+ super(Code.IllegalOpException);
+ }
+
+ public BookieIllegalOpException(String reason) {
+ super(Code.IllegalOpException, reason);
+ }
+
+ public BookieIllegalOpException(Throwable cause) {
+ super(Code.IllegalOpException, cause);
}
}
@@ -165,6 +194,23 @@ public abstract class BookieException extends Exception {
}
/**
+ * Signal that no cookie is found when starting a bookie.
+ */
+ public static class CookieNotFoundException extends BookieException {
+ public CookieNotFoundException() {
+ this("");
+ }
+
+ public CookieNotFoundException(String reason) {
+ super(Code.CookieNotFoundException, reason);
+ }
+
+ public CookieNotFoundException(Throwable cause) {
+ super(Code.CookieNotFoundException, cause);
+ }
+ }
+
+ /**
* Signals that an exception occurs on upgrading a bookie.
*/
public static class UpgradeException extends BookieException {
@@ -197,4 +243,40 @@ public abstract class BookieException extends Exception {
super(Code.DiskPartitionDuplicationException, reason);
}
}
+
+ /**
+ * Signal when bookie has problems on accessing metadata store.
+ */
+ public static class MetadataStoreException extends BookieException {
+
+ public MetadataStoreException() {
+ this("");
+ }
+
+ public MetadataStoreException(String reason) {
+ super(Code.MetadataStoreException, reason);
+ }
+
+ public MetadataStoreException(Throwable cause) {
+ super(Code.MetadataStoreException, cause);
+ }
+
+ public MetadataStoreException(String reason, Throwable cause) {
+ super(Code.MetadataStoreException, reason, cause);
+ }
+ }
+
+ /**
+ * Signal when bookie has problems on accessing metadata store.
+ */
+ public static class UnknownBookieIdException extends BookieException {
+
+ public UnknownBookieIdException() {
+ super(Code.UnknownBookieIdException);
+ }
+
+ public UnknownBookieIdException(Throwable cause) {
+ super(Code.UnknownBookieIdException, cause);
+ }
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 1c08eb2..481c647 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
@@ -61,6 +62,8 @@ import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.UpdateLedgerOp;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -69,6 +72,7 @@ import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.AuditorElector;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.IOUtils;
@@ -88,7 +92,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -261,18 +264,15 @@ public class BookieShell implements Tool {
boolean result = Bookie.format(conf, interactive, force);
// delete cookie
if (cmdLine.hasOption("d")) {
- ZooKeeperClient zkc =
- ZooKeeperClient.newBuilder()
- .connectString(conf.getZkServers())
- .sessionTimeoutMs(conf.getZkTimeout())
- .build();
+ RegistrationManager rm = new ZKRegistrationManager();
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
try {
- Versioned<Cookie> cookie = Cookie.readFromZooKeeper(zkc, conf);
- cookie.getValue().deleteFromZooKeeper(zkc, conf, cookie.getVersion());
- } catch (KeeperException.NoNodeException nne) {
+ Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
+ cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
+ } catch (CookieNotFoundException nne) {
LOG.warn("No cookie to remove : ", nne);
} finally {
- zkc.close();
+ rm.close();
}
}
return (result) ? 0 : 1;
@@ -324,7 +324,7 @@ public class BookieShell implements Tool {
private int bkRecovery(ClientConfiguration conf, BookKeeperAdmin bkAdmin,
String[] args, boolean deleteCookie)
- throws InterruptedException, BKException, KeeperException, IOException {
+ throws InterruptedException, BKException, BookieException, IOException {
final String bookieSrcString[] = args[0].split(":");
if (bookieSrcString.length != 2) {
System.err.println("BookieSrc inputted has invalid format"
@@ -347,10 +347,14 @@ public class BookieShell implements Tool {
bkAdmin.recoverBookieData(bookieSrc, bookieDest);
if (deleteCookie) {
+ ServerConfiguration serverConf = new ServerConfiguration();
+ serverConf.addConfiguration(conf);
+ RegistrationManager rm = new ZKRegistrationManager();
try {
- Versioned<Cookie> cookie = Cookie.readFromZooKeeper(bkAdmin.getZooKeeper(), conf, bookieSrc);
- cookie.getValue().deleteFromZooKeeper(bkAdmin.getZooKeeper(), conf, bookieSrc, cookie.getVersion());
- } catch (KeeperException.NoNodeException nne) {
+ rm.initialize(serverConf, () -> {}, NullStatsLogger.INSTANCE);
+ Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, bookieSrc);
+ cookie.getValue().deleteFromRegistrationManager(rm, bookieSrc, cookie.getVersion());
+ } catch (CookieNotFoundException nne) {
LOG.warn("No cookie to remove for {} : ", bookieSrc, nne);
}
}
@@ -1506,22 +1510,19 @@ public class BookieShell implements Tool {
return updateBookieIdInCookie(bookieId, useHostName);
}
- private int updateBookieIdInCookie(final String bookieId, final boolean useHostname) throws IOException,
+ private int updateBookieIdInCookie(final String bookieId, final boolean useHostname) throws BookieException,
InterruptedException {
- ZooKeeper zk = null;
+ RegistrationManager rm = new ZKRegistrationManager();
try {
- zk = ZooKeeperClient.newBuilder()
- .connectString(bkConf.getZkServers())
- .sessionTimeoutMs(bkConf.getZkTimeout())
- .build();
+ rm.initialize(bkConf, () -> {}, NullStatsLogger.INSTANCE);
ServerConfiguration conf = new ServerConfiguration(bkConf);
String newBookieId = Bookie.getBookieAddress(conf).toString();
// read oldcookie
Versioned<Cookie> oldCookie = null;
try {
conf.setUseHostNameAsBookieID(!useHostname);
- oldCookie = Cookie.readFromZooKeeper(zk, conf);
- } catch (KeeperException.NoNodeException nne) {
+ oldCookie = Cookie.readFromRegistrationManager(rm, conf);
+ } catch (CookieNotFoundException nne) {
LOG.error("Either cookie already updated with UseHostNameAsBookieID={} or no cookie exists!",
useHostname, nne);
return -1;
@@ -1540,12 +1541,12 @@ public class BookieShell implements Tool {
if (hasCookieUpdatedInDirs) {
try {
conf.setUseHostNameAsBookieID(useHostname);
- Cookie.readFromZooKeeper(zk, conf);
+ Cookie.readFromRegistrationManager(rm, conf);
// since newcookie exists, just do cleanup of oldcookie and return
conf.setUseHostNameAsBookieID(!useHostname);
- oldCookie.getValue().deleteFromZooKeeper(zk, conf, oldCookie.getVersion());
+ oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
return 0;
- } catch (KeeperException.NoNodeException nne) {
+ } catch (CookieNotFoundException nne) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring, cookie will be written to zookeeper");
}
@@ -1569,20 +1570,17 @@ public class BookieShell implements Tool {
}
// writes newcookie to zookeeper
conf.setUseHostNameAsBookieID(useHostname);
- newCookie.writeToZooKeeper(zk, conf, Version.NEW);
+ newCookie.writeToRegistrationManager(rm, conf, Version.NEW);
// delete oldcookie
conf.setUseHostNameAsBookieID(!useHostname);
- oldCookie.getValue().deleteFromZooKeeper(zk, conf, oldCookie.getVersion());
- } catch (KeeperException ke) {
- LOG.error("KeeperException during cookie updation!", ke);
- return -1;
+ oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
} catch (IOException ioe) {
LOG.error("IOException during cookie updation!", ioe);
return -1;
} finally {
- if (zk != null) {
- zk.close();
+ if (rm != null) {
+ rm.close();
}
}
return 0;
@@ -1628,31 +1626,33 @@ public class BookieShell implements Tool {
@Override
int runCmd(CommandLine cmdLine) {
ServerConfiguration conf = new ServerConfiguration(bkConf);
- ZooKeeper zk;
+ RegistrationManager rm = new ZKRegistrationManager();
try {
- zk = ZooKeeperClient.newBuilder()
- .connectString(bkConf.getZkServers())
- .sessionTimeoutMs(bkConf.getZkTimeout()).build();
- } catch (KeeperException | InterruptedException | IOException e) {
- LOG.error("Exception while establishing zookeeper connection.", e);
- return -1;
- }
+ try {
+ rm.initialize(bkConf, () -> {}, NullStatsLogger.INSTANCE);
+ } catch (BookieException e) {
+ LOG.error("Exception while establishing zookeeper connection.", e);
+ return -1;
+ }
- List<File> allLedgerDirs = Lists.newArrayList();
- allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
- if (indexDirectories != ledgerDirectories) {
- allLedgerDirs.addAll(Arrays.asList(indexDirectories));
- }
+ List<File> allLedgerDirs = Lists.newArrayList();
+ allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
+ if (indexDirectories != ledgerDirectories) {
+ allLedgerDirs.addAll(Arrays.asList(indexDirectories));
+ }
- try {
- Bookie.checkEnvironmentWithStorageExpansion(conf, zk,
+ try {
+ Bookie.checkEnvironmentWithStorageExpansion(conf, rm,
Lists.newArrayList(journalDirectories), allLedgerDirs);
- } catch (BookieException | IOException e) {
- LOG.error(
+ } catch (BookieException e) {
+ LOG.error(
"Exception while updating cookie for storage expansion", e);
- return -1;
+ return -1;
+ }
+ return 0;
+ } finally {
+ rm.close();
}
- return 0;
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
index ec7793c..e2d4f12 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
@@ -37,28 +37,23 @@ import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.List;
import java.util.Set;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
+import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats.CookieFormat;
import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* When a bookie starts for the first time it generates a cookie, and stores
- * the cookie in zookeeper as well as in the each of the local filesystem
+ * the cookie in registration manager as well as in the each of the local filesystem
* directories it uses. This cookie is used to ensure that for the life of the
* bookie, its configuration stays the same. If any of the bookie directories
* becomes unavailable, the bookie becomes unavailable. If the bookie changes
@@ -233,79 +228,61 @@ public class Cookie {
}
/**
- * Writes cookie details to ZooKeeper.
+ * Writes cookie details to registration manager.
*
- * @param zk ZooKeeper instance
+ * @param rm registration manager
* @param conf configuration
* @param version version
- * @throws KeeperException
- * @throws InterruptedException
- * @throws UnknownHostException
+ * @throws BookieException when fail to write the cookie.
*/
- public void writeToZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version)
- throws KeeperException, InterruptedException, UnknownHostException {
- List<ACL> zkAcls = ZkUtils.getACLs(conf);
- String bookieCookiePath = conf.getZkLedgersRootPath() + "/"
- + BookKeeperConstants.COOKIE_NODE;
- String zkPath = getZkPath(conf);
- byte[] data = toString().getBytes(UTF_8);
- if (Version.NEW == version) {
- if (zk.exists(bookieCookiePath, false) == null) {
- try {
- zk.create(bookieCookiePath, new byte[0],
- zkAcls, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nne) {
- LOG.info("More than one bookie tried to create {} at once. Safe to ignore",
- bookieCookiePath);
- }
- }
- zk.create(zkPath, data,
- zkAcls, CreateMode.PERSISTENT);
- } else {
- if (!(version instanceof LongVersion)) {
- throw new IllegalArgumentException("Invalid version type, expected ZkVersion type");
- }
- zk.setData(zkPath, data, (int) ((LongVersion) version).getLongVersion());
+ public void writeToRegistrationManager(RegistrationManager rm, ServerConfiguration conf, Version version)
+ throws BookieException {
+ BookieSocketAddress address = null;
+ try {
+ address = Bookie.getBookieAddress(conf);
+ } catch (UnknownHostException e) {
+ throw new UnknownBookieIdException(e);
}
+ byte[] data = toString().getBytes(UTF_8);
+ rm.writeCookie(address.toString(), new Versioned<>(data, version));
}
/**
- * Deletes cookie from ZooKeeper and sets znode version to DEFAULT_COOKIE_ZNODE_VERSION.
+ * Deletes cookie from registration manager.
*
- * @param zk ZooKeeper instance
+ * @param rm registration manager
* @param conf configuration
- * @param version zookeeper version
- * @throws KeeperException
- * @throws InterruptedException
- * @throws UnknownHostException
+ * @param version cookie version
+ * @throws BookieException when fail to delete cookie.
*/
- public void deleteFromZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version) throws KeeperException,
- InterruptedException, UnknownHostException {
- BookieSocketAddress address = Bookie.getBookieAddress(conf);
- deleteFromZooKeeper(zk, conf, address, version);
+ public void deleteFromRegistrationManager(RegistrationManager rm,
+ ServerConfiguration conf,
+ Version version) throws BookieException {
+ BookieSocketAddress address = null;
+ try {
+ address = Bookie.getBookieAddress(conf);
+ } catch (UnknownHostException e) {
+ throw new UnknownBookieIdException(e);
+ }
+ deleteFromRegistrationManager(rm, address, version);
}
/**
- * Delete cookie from zookeeper.
+ * Delete cookie from registration manager.
*
- * @param zk zookeeper client
- * @param conf configuration instance
+ * @param rm registration manager
* @param address bookie address
* @param version cookie version
- * @throws KeeperException
- * @throws InterruptedException
- * @throws UnknownHostException
+ * @throws BookieException when fail to delete cookie.
*/
- public void deleteFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf,
- BookieSocketAddress address, Version version)
- throws KeeperException, InterruptedException, UnknownHostException {
+ public void deleteFromRegistrationManager(RegistrationManager rm,
+ BookieSocketAddress address,
+ Version version) throws BookieException {
if (!(version instanceof LongVersion)) {
throw new IllegalArgumentException("Invalid version type, expected ZkVersion type");
}
- String zkPath = getZkPath(conf, address);
- zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
- LOG.info("Removed cookie from {} for bookie {}.", conf.getZkLedgersRootPath(), address);
+ rm.removeCookie(address.toString(), version);
}
/**
@@ -326,48 +303,44 @@ public class Cookie {
}
/**
- * Read cookie from ZooKeeper.
+ * Read cookie from registration manager.
*
- * @param zk ZooKeeper instance
+ * @param rm registration manager
* @param conf configuration
* @return versioned cookie object
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- * @throws UnknownHostException
+ * @throws BookieException when fail to read cookie
*/
- public static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, ServerConfiguration conf)
- throws KeeperException, InterruptedException, IOException, UnknownHostException {
- return readFromZooKeeper(zk, conf, Bookie.getBookieAddress(conf));
+ public static Versioned<Cookie> readFromRegistrationManager(RegistrationManager rm, ServerConfiguration conf)
+ throws BookieException {
+ try {
+ return readFromRegistrationManager(rm, Bookie.getBookieAddress(conf));
+ } catch (UnknownHostException e) {
+ throw new UnknownBookieIdException(e);
+ }
}
/**
- * Read cookie from zookeeper for a given bookie <i>address</i>.
+ * Read cookie from registration manager for a given bookie <i>address</i>.
*
- * @param zk zookeeper client
- * @param conf configuration instance
+ * @param rm registration manager
* @param address bookie address
* @return versioned cookie object
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- * @throws UnknownHostException
+ * @throws BookieException when fail to read cookie
*/
- public static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf, BookieSocketAddress address)
- throws KeeperException, InterruptedException, IOException, UnknownHostException {
- String zkPath = getZkPath(conf, address);
-
- Stat stat = zk.exists(zkPath, false);
- byte[] data = zk.getData(zkPath, false, stat);
- BufferedReader reader = new BufferedReader(new StringReader(new String(data, UTF_8)));
+ public static Versioned<Cookie> readFromRegistrationManager(RegistrationManager rm,
+ BookieSocketAddress address) throws BookieException {
+ Versioned<byte[]> cookieData = rm.readCookie(address.toString());
+ BufferedReader reader = new BufferedReader(new StringReader(new String(cookieData.getValue(), UTF_8)));
try {
- Builder builder = parse(reader);
- Cookie cookie = builder.build();
- // sets stat version from ZooKeeper
- LongVersion version = new LongVersion(stat.getVersion());
- return new Versioned<Cookie>(cookie, version);
- } finally {
- reader.close();
+ try {
+ Builder builder = parse(reader);
+ Cookie cookie = builder.build();
+ return new Versioned<Cookie>(cookie, cookieData.getVersion());
+ } finally {
+ reader.close();
+ }
+ } catch (IOException ioe) {
+ throw new InvalidCookieException(ioe);
}
}
@@ -390,30 +363,6 @@ public class Cookie {
}
/**
- * Returns cookie path in zookeeper.
- *
- * @param conf configuration
- * @return cookie zk path
- * @throws UnknownHostException
- */
- static String getZkPath(ServerConfiguration conf)
- throws UnknownHostException {
- return getZkPath(conf, Bookie.getBookieAddress(conf));
- }
-
- /**
- * Return cookie path for a given bookie <i>address</i>.
- *
- * @param conf configuration
- * @param address bookie address
- * @return cookie path for bookie
- */
- static String getZkPath(AbstractConfiguration conf, BookieSocketAddress address) {
- String bookieCookiePath = conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.COOKIE_NODE;
- return bookieCookiePath + "/" + address;
- }
-
- /**
* Check whether the 'bookieHost' was created using a hostname or an IP
* address. Represent as 'hostname/IPaddress' if the InetSocketAddress was
* created using hostname. Represent as '/IPaddress' if the
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index 045f8b2..4158abd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -36,20 +36,19 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Scanner;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.HardLink;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,22 +127,15 @@ public class FileSystemUpgrade {
}
}
- private static ZooKeeper newZookeeper(final ServerConfiguration conf)
+ private static RegistrationManager newRegistrationManager(final ServerConfiguration conf)
throws BookieException.UpgradeException {
+
try {
- int zkTimeout = conf.getZkTimeout();
- return ZooKeeperClient.newBuilder()
- .connectString(conf.getZkServers())
- .sessionTimeoutMs(zkTimeout)
- .operationRetryPolicy(
- new BoundExponentialBackoffRetryPolicy(zkTimeout, zkTimeout, Integer.MAX_VALUE))
- .build();
- } catch (InterruptedException ie) {
- throw new BookieException.UpgradeException(ie);
- } catch (IOException ioe) {
- throw new BookieException.UpgradeException(ioe);
- } catch (KeeperException ke) {
- throw new BookieException.UpgradeException(ke);
+ Class<? extends RegistrationManager> rmClass = conf.getRegistrationManagerClass();
+ RegistrationManager rm = ReflectionUtils.newInstance(rmClass);
+ return rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ } catch (Exception e) {
+ throw new BookieException.UpgradeException(e);
}
}
@@ -177,7 +169,7 @@ public class FileSystemUpgrade {
throws BookieException.UpgradeException, InterruptedException {
LOG.info("Upgrading...");
- ZooKeeper zk = newZookeeper(conf);
+ RegistrationManager rm = newRegistrationManager(conf);
try {
Map<File, File> deferredMoves = new HashMap<File, File>();
Cookie.Builder cookieBuilder = Cookie.generateCookie(conf);
@@ -229,15 +221,15 @@ public class FileSystemUpgrade {
}
try {
- c.writeToZooKeeper(zk, conf, Version.NEW);
- } catch (KeeperException ke) {
- LOG.error("Error writing cookie to zookeeper");
+ c.writeToRegistrationManager(rm, conf, Version.NEW);
+ } catch (BookieException ke) {
+ LOG.error("Error writing cookie to registration manager");
throw new BookieException.UpgradeException(ke);
}
} catch (IOException ioe) {
throw new BookieException.UpgradeException(ioe);
} finally {
- zk.close();
+ rm.close();
}
LOG.info("Done");
}
@@ -283,7 +275,7 @@ public class FileSystemUpgrade {
public static void rollback(ServerConfiguration conf)
throws BookieException.UpgradeException, InterruptedException {
LOG.info("Rolling back upgrade...");
- ZooKeeper zk = newZookeeper(conf);
+ RegistrationManager rm = newRegistrationManager(conf);
try {
for (File d : getAllDirectories(conf)) {
LOG.info("Rolling back {}", d);
@@ -305,17 +297,14 @@ public class FileSystemUpgrade {
}
}
try {
- Versioned<Cookie> cookie = Cookie.readFromZooKeeper(zk, conf);
- cookie.getValue().deleteFromZooKeeper(zk, conf, cookie.getVersion());
- } catch (KeeperException ke) {
- LOG.error("Error deleting cookie from ZooKeeper");
+ Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
+ cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
+ } catch (BookieException ke) {
+ LOG.error("Error deleting cookie from Registration Manager");
throw new BookieException.UpgradeException(ke);
- } catch (IOException ioe) {
- LOG.error("I/O Error deleting cookie from ZooKeeper");
- throw new BookieException.UpgradeException(ioe);
}
} finally {
- zk.close();
+ rm.close();
}
LOG.info("Done");
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index aae3e7d..10fb329 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -26,6 +26,8 @@ import com.google.common.annotations.Beta;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
@@ -173,6 +175,9 @@ public class ServerConfiguration extends AbstractConfiguration {
// Lifecycle Components
protected final static String EXTRA_SERVER_COMPONENTS = "extraServerComponents";
+ // Registration
+ protected final static String REGISTRATION_MANAGER_CLASS = "registrationManagerClass";
+
/**
* Construct a default configuration object
*/
@@ -2384,4 +2389,27 @@ public class ServerConfiguration extends AbstractConfiguration {
return this;
}
+ /**
+ * Set registration manager class
+ *
+ * @param regManagerClass
+ * ManagerClass
+ */
+ public void setRegistrationManagerClass(
+ Class<? extends RegistrationManager> regManagerClass) {
+ setProperty(REGISTRATION_MANAGER_CLASS, regManagerClass);
+ }
+
+ /**
+ * Get Registration Manager Class.
+ *
+ * @return registration manager class.
+ */
+ public Class<? extends RegistrationManager> getRegistrationManagerClass()
+ throws ConfigurationException {
+ return ReflectionUtils.getClass(this, REGISTRATION_MANAGER_CLASS,
+ ZKRegistrationManager.class, RegistrationManager.class,
+ defaultLoader);
+ }
+
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
new file mode 100644
index 0000000..7f6160a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A registration client, which the bookkeeper client will use to interact with registration service.
+ */
+public interface RegistrationClient {
+
+ /**
+ * Get the list of available bookie identifiers.
+ *
+ * @return a future represents the list of available bookies
+ */
+ CompletableFuture<List<String>> getAvailableBookies();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
new file mode 100644
index 0000000..2b274d7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
+/**
+ * Registration manager interface, which a bookie server will use to do the registration process.
+ */
+@LimitedPrivate
+@Evolving
+public interface RegistrationManager extends AutoCloseable {
+
+ /**
+ * Registration Listener on listening the registration state.
+ */
+ @FunctionalInterface
+ interface RegistrationListener {
+
+ /**
+ * Signal when registration is expired.
+ */
+ void onRegistrationExpired();
+
+ }
+
+ RegistrationManager initialize(ServerConfiguration conf,
+ RegistrationListener listener,
+ StatsLogger statsLogger) throws BookieException;
+
+ @Override
+ void close();
+
+ /**
+ * Return the cluster instance id.
+ *
+ * @return the cluster instance id.
+ */
+ String getClusterInstanceId() throws BookieException;
+
+ /**
+ * Registering the bookie server as <i>bookieId</i>.
+ *
+ * @param bookieId bookie id
+ * @param readOnly whether to register it as writable or readonly
+ * @throws BookieException when fail to register a bookie.
+ */
+ void registerBookie(String bookieId, boolean readOnly) throws BookieException;
+
+ /**
+ * Unregistering the bookie server as <i>bookieId</i>.
+ *
+ * @param bookieId bookie id
+ * @param readOnly whether to register it as writable or readonly
+ * @throws BookieException when fail to unregister a bookie.
+ */
+ void unregisterBookie(String bookieId, boolean readOnly) throws BookieException;
+
+ /**
+ * Write the cookie data, which will be used for verifying the integrity of the bookie environment.
+ *
+ * @param bookieId bookie id
+ * @param cookieData cookie data
+ * @throws BookieException when fail to write cookie
+ */
+ void writeCookie(String bookieId, Versioned<byte[]> cookieData) throws BookieException;
+
+ /**
+ * Read the cookie data, which will be used for verifying the integrity of the bookie environment.
+ *
+ * @param bookieId bookie id
+ * @return versioned cookie data
+ * @throws BookieException when fail to read cookie
+ */
+ Versioned<byte[]> readCookie(String bookieId) throws BookieException;
+
+ /**
+ * Remove the cookie data.
+ *
+ * @param bookieId bookie id
+ * @param version version of the cookie data
+ * @throws BookieException when fail to remove cookie
+ */
+ void removeCookie(String bookieId, Version version) throws BookieException;
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
new file mode 100644
index 0000000..dbe924d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
+import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * ZooKeeper Based {@link RegistrationManager}.
+ */
+@Slf4j
+public class ZKRegistrationManager implements RegistrationManager {
+
+ private ServerConfiguration conf;
+ private ZooKeeper zk;
+ private List<ACL> zkAcls;
+ private volatile boolean running = false;
+
+ // cookie path
+ private String cookiePath;
+ // registration paths
+ protected String bookieRegistrationPath;
+ protected String bookieReadonlyRegistrationPath;
+
+ private StatsLogger statsLogger;
+
+ @Override
+ public RegistrationManager initialize(ServerConfiguration conf,
+ RegistrationListener listener,
+ StatsLogger statsLogger)
+ throws BookieException {
+ if (null == conf.getZkServers()) {
+ log.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
+ return null;
+ }
+
+ this.conf = conf;
+ this.zkAcls = ZkUtils.getACLs(conf);
+ this.statsLogger = statsLogger;
+
+ this.cookiePath = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE;
+ this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
+ this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
+
+ try {
+ this.zk = newZookeeper(conf, listener);
+ } catch (InterruptedException | KeeperException | IOException e) {
+ throw new MetadataStoreException(e);
+ }
+ return this;
+ }
+
+ @VisibleForTesting
+ public void setZk(ZooKeeper zk) {
+ this.zk = zk;
+ }
+
+ @VisibleForTesting
+ public ZooKeeper getZk() {
+ return this.zk;
+ }
+
+ /**
+ * Create a new zookeeper client to zk cluster.
+ *
+ * <p>
+ * Bookie Server just used zk client when syncing ledgers for garbage collection.
+ * So when zk client is expired, it means this bookie server is not available in
+ * bookie server list. The bookie client will be notified for its expiration. No
+ * more bookie request will be sent to this server. So it's better to exit when zk
+ * expired.
+ * </p>
+ * <p>
+ * Since there are lots of bk operations cached in queue, so we wait for all the operations
+ * are processed and quit. It is done by calling <b>shutdown</b>.
+ * </p>
+ *
+ * @param conf server configuration
+ *
+ * @return zk client instance
+ */
+ private ZooKeeper newZookeeper(final ServerConfiguration conf, RegistrationListener listener)
+ throws InterruptedException, KeeperException, IOException {
+ Set<Watcher> watchers = new HashSet<Watcher>();
+ watchers.add(event -> {
+ if (!running) {
+ // do nothing until first registration
+ return;
+ }
+ // Check for expired connection.
+ if (event.getType().equals(EventType.None) && event.getState().equals(KeeperState.Expired)) {
+ listener.onRegistrationExpired();
+ }
+ });
+ return ZooKeeperClient.newBuilder()
+ .connectString(conf.getZkServers())
+ .sessionTimeoutMs(conf.getZkTimeout())
+ .watchers(watchers)
+ .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
+ conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
+ .requestRateLimit(conf.getZkRequestRateLimit())
+ .statsLogger(this.statsLogger.scope(BOOKIE_SCOPE))
+ .build();
+ }
+
+ @Override
+ public void close() {
+ if (null != zk) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ log.warn("Interrupted on closing zookeeper client", e);
+ }
+ }
+ }
+
+ private String getCookiePath(String bookieId) {
+ return this.cookiePath + "/" + bookieId;
+ }
+
+ //
+ // Registration Management
+ //
+
+ /**
+ * Check existence of <i>regPath</i> and wait it expired if possible.
+ *
+ * @param regPath reg node path.
+ * @return true if regPath exists, otherwise return false
+ * @throws IOException if can't create reg path
+ */
+ protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
+ final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+ Watcher zkPrevRegNodewatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Check for prev znode deletion. Connection expiration is
+ // not handling, since bookie has logic to shutdown.
+ if (EventType.NodeDeleted == event.getType()) {
+ prevNodeLatch.countDown();
+ }
+ }
+ };
+ try {
+ Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
+ if (null != stat) {
+ // if the ephemeral owner isn't current zookeeper client
+ // wait for it to be expired.
+ if (stat.getEphemeralOwner() != zk.getSessionId()) {
+ log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+ + " {} ms for znode deletion", regPath, conf.getZkTimeout());
+ // waiting for the previous bookie reg znode deletion
+ if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
+ throw new NodeExistsException(regPath);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (KeeperException ke) {
+ log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
+ throw new IOException("ZK exception checking and wait ephemeral znode "
+ + regPath + " expired", ke);
+ } catch (InterruptedException ie) {
+ log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
+ throw new IOException("Interrupted checking and wait ephemeral znode "
+ + regPath + " expired", ie);
+ }
+ }
+
+ @Override
+ public void registerBookie(String bookieId, boolean readOnly) throws BookieException {
+ if (!readOnly) {
+ String regPath = bookieRegistrationPath + "/" + bookieId;
+ doRegisterBookie(regPath);
+ } else {
+ doRegisterReadOnlyBookie(bookieId);
+ }
+ }
+
+ private void doRegisterBookie(String regPath) throws BookieException {
+ // ZK ephemeral node for this Bookie.
+ try {
+ if (!checkRegNodeAndWaitExpired(regPath)) {
+ // Create the ZK ephemeral node for this Bookie.
+ zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
+ }
+ } catch (KeeperException ke) {
+ log.error("ZK exception registering ephemeral Znode for Bookie!", ke);
+ // Throw an IOException back up. This will cause the Bookie
+ // constructor to error out. Alternatively, we could do a System
+ // exit here as this is a fatal error.
+ throw new MetadataStoreException(ke);
+ } catch (InterruptedException ie) {
+ log.error("Interrupted exception registering ephemeral Znode for Bookie!", ie);
+ // Throw an IOException back up. This will cause the Bookie
+ // constructor to error out. Alternatively, we could do a System
+ // exit here as this is a fatal error.
+ throw new MetadataStoreException(ie);
+ } catch (IOException e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ private void doRegisterReadOnlyBookie(String bookieId) throws BookieException {
+ try {
+ if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
+ try {
+ zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
+ zkAcls, CreateMode.PERSISTENT);
+ } catch (NodeExistsException e) {
+ // this node is just now created by someone.
+ }
+ }
+
+ String regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+ doRegisterBookie(regPath);
+ // clear the write state
+ regPath = bookieRegistrationPath + "/" + bookieId;
+ try {
+ // Clear the current registered node
+ zk.delete(regPath, -1);
+ } catch (KeeperException.NoNodeException nne) {
+ log.warn("No writable bookie registered node {} when transitioning to readonly",
+ regPath, nne);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ @Override
+ public void unregisterBookie(String bookieId, boolean readOnly) throws BookieException {
+ String regPath;
+ if (!readOnly) {
+ regPath = bookieRegistrationPath + "/" + bookieId;
+ } else {
+ regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+ }
+ doUnregisterBookie(regPath);
+ }
+
+ private void doUnregisterBookie(String regPath) throws BookieException {
+ try {
+ zk.delete(regPath, -1);
+ } catch (InterruptedException | KeeperException e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ //
+ // Cookie Management
+ //
+
+ @Override
+ public void writeCookie(String bookieId,
+ Versioned<byte[]> cookieData) throws BookieException {
+ String zkPath = getCookiePath(bookieId);
+ try {
+ if (Version.NEW == cookieData.getVersion()) {
+ if (zk.exists(cookiePath, false) == null) {
+ try {
+ zk.create(cookiePath, new byte[0], zkAcls, CreateMode.PERSISTENT);
+ } catch (NodeExistsException nne) {
+ log.info("More than one bookie tried to create {} at once. Safe to ignore.",
+ cookiePath);
+ }
+ }
+ zk.create(zkPath, cookieData.getValue(), zkAcls, CreateMode.PERSISTENT);
+ } else {
+ if (!(cookieData.getVersion() instanceof LongVersion)) {
+ throw new BookieIllegalOpException("Invalid version type, expected it to be LongVersion");
+ }
+ zk.setData(
+ zkPath,
+ cookieData.getValue(),
+ (int) ((LongVersion) cookieData.getVersion()).getLongVersion());
+ }
+ } catch (InterruptedException | KeeperException e) {
+ throw new MetadataStoreException("Failed to write cookie for bookie " + bookieId);
+ }
+ }
+
+ @Override
+ public Versioned<byte[]> readCookie(String bookieId) throws BookieException {
+ String zkPath = getCookiePath(bookieId);
+ try {
+ Stat stat = zk.exists(zkPath, false);
+ byte[] data = zk.getData(zkPath, false, stat);
+ // sets stat version from ZooKeeper
+ LongVersion version = new LongVersion(stat.getVersion());
+ return new Versioned<>(data, version);
+ } catch (NoNodeException nne) {
+ throw new CookieNotFoundException(bookieId);
+ } catch (KeeperException | InterruptedException e) {
+ throw new MetadataStoreException("Failed to read cookie for bookie " + bookieId);
+ }
+ }
+
+ @Override
+ public void removeCookie(String bookieId, Version version) throws BookieException {
+ String zkPath = getCookiePath(bookieId);
+ try {
+ zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
+ } catch (NoNodeException e) {
+ throw new CookieNotFoundException(bookieId);
+ } catch (InterruptedException | KeeperException e) {
+ throw new MetadataStoreException("Failed to delete cookie for bookie " + bookieId);
+ }
+
+ log.info("Removed cookie from {} for bookie {}.", cookiePath, bookieId);
+ }
+
+
+ @Override
+ public String getClusterInstanceId() throws BookieException {
+ String instanceId = null;
+ try {
+ if (zk.exists(conf.getZkLedgersRootPath(), null) == null) {
+ log.error("BookKeeper metadata doesn't exist in zookeeper. "
+ + "Has the cluster been initialized? "
+ + "Try running bin/bookkeeper shell metaformat");
+ throw new KeeperException.NoNodeException("BookKeeper metadata");
+ }
+ try {
+ byte[] data = zk.getData(conf.getZkLedgersRootPath() + "/"
+ + INSTANCEID, false, null);
+ instanceId = new String(data, UTF_8);
+ } catch (KeeperException.NoNodeException e) {
+ log.info("INSTANCEID not exists in zookeeper. Not considering it for data verification");
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new MetadataStoreException("Failed to get cluster instance id", e);
+ }
+ return instanceId;
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/package-info.java
new file mode 100644
index 0000000..ecc26f5
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes related to service discovery.
+ */
+package org.apache.bookkeeper.discover;
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
index e51ca12..cd87a24 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
@@ -29,9 +29,12 @@ import java.util.List;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
import org.apache.bookkeeper.http.service.HttpEndpointService;
import org.apache.bookkeeper.http.service.HttpServiceRequest;
import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,9 +83,11 @@ public class ExpandStorageService implements HttpEndpointService {
}
try {
- Bookie.checkEnvironmentWithStorageExpansion(conf, zk,
+ RegistrationManager rm = new ZKRegistrationManager();
+ rm.initialize(conf, () -> { }, NullStatsLogger.INSTANCE);
+ Bookie.checkEnvironmentWithStorageExpansion(conf, rm,
Lists.newArrayList(journalDirectories), allLedgerDirs);
- } catch (BookieException | IOException e) {
+ } catch (BookieException e) {
LOG.error("Exception occurred while updating cookie for storage expansion", e);
response.setCode(HttpServer.StatusCode.INTERNAL_ERROR);
response.setBody("Exception while updating cookie for storage expansion");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
index 74ffa33..b079f1b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
@@ -27,11 +27,14 @@ import org.apache.bookkeeper.bookie.Cookie;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.http.service.HttpEndpointService;
import org.apache.bookkeeper.http.service.HttpServiceRequest;
import org.apache.bookkeeper.http.service.HttpServiceResponse;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.JsonUtil;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +106,10 @@ public class RecoveryBookieService implements HttpEndpointService {
if (HttpServer.Method.PUT == request.getMethod() &&
!requestJsonBody.bookie_src.isEmpty()) {
- ClientConfiguration adminConf = new ClientConfiguration(conf);
+
+ Class<? extends RegistrationManager> rmClass = conf.getRegistrationManagerClass();
+ RegistrationManager rm = ReflectionUtils.newInstance(rmClass);
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
String bookieSrcString[] = requestJsonBody.bookie_src.get(0).split(":");
BookieSocketAddress bookieSrc = new BookieSocketAddress(
@@ -122,8 +128,8 @@ public class RecoveryBookieService implements HttpEndpointService {
LOG.info("Start recovering bookie.");
bka.recoverBookieData(bookieSrc, bookieDest);
if (deleteCookie) {
- Versioned<Cookie> cookie = Cookie.readFromZooKeeper(bka.getZooKeeper(), adminConf, bookieSrc);
- cookie.getValue().deleteFromZooKeeper(bka.getZooKeeper(), adminConf, bookieSrc, cookie.getVersion());
+ Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, bookieSrc);
+ cookie.getValue().deleteFromRegistrationManager(rm, bookieSrc, cookie.getVersion());
}
LOG.info("Complete recovering bookie");
} catch (Exception e) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 135736f..3783c57 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -38,6 +38,8 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -49,6 +51,7 @@ import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
import org.junit.Rule;
@@ -66,10 +69,11 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
@Rule
public final TestName runtime = new TestName();
+ RegistrationManager rm;
public BookieInitializationTest() {
super(0);
- String ledgersPath = "/" + runtime.getMethodName();
+ String ledgersPath = "/" + "ledgers" + runtime.getMethodName();
baseClientConf.setZkLedgersRootPath(ledgersPath);
baseConf.setZkLedgersRootPath(ledgersPath);
}
@@ -78,6 +82,15 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
public void setUp() throws Exception {
super.setUp();
zkUtil.createBKEnsemble("/" + runtime.getMethodName());
+ rm = new ZKRegistrationManager();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if(rm != null) {
+ rm.close();
+ }
}
private static class MockBookie extends Bookie {
@@ -109,8 +122,10 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
throws IOException, KeeperException, InterruptedException,
BookieException {
MockBookie bookie = new MockBookie(conf);
- bookie.zk = zkc;
- zkc.close();
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ bookie.registrationManager = rm;
+ ((ZKRegistrationManager) bookie.registrationManager).setZk(zkc);
+ ((ZKRegistrationManager) bookie.registrationManager).getZk().close();
return bookie;
}
};
@@ -134,15 +149,19 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
+ conf.getBookiePort();
MockBookie b = new MockBookie(conf);
- b.zk = zkc;
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ b.registrationManager = rm;
+
b.testRegisterBookie(conf);
+ ZooKeeper zooKeeper = ((ZKRegistrationManager) rm).getZk();
Assert.assertNotNull("Bookie registration node doesn't exists!",
- zkc.exists(bkRegPath, false));
+ zooKeeper.exists(bkRegPath, false));
// test register bookie again if the registeration node is created by itself.
b.testRegisterBookie(conf);
Assert.assertNotNull("Bookie registration node doesn't exists!",
- zkc.exists(bkRegPath, false));
+ zooKeeper.exists(bkRegPath, false));
}
/**
@@ -161,18 +180,24 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
+ InetAddress.getLocalHost().getHostAddress() + ":"
+ conf.getBookiePort();
-
MockBookie b = new MockBookie(conf);
- b.zk = zkc;
+
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ b.registrationManager = rm;
+
b.testRegisterBookie(conf);
- Stat bkRegNode1 = zkc.exists(bkRegPath, false);
+
+ Stat bkRegNode1 = ((ZKRegistrationManager) rm).getZk().exists(bkRegPath, false);
Assert.assertNotNull("Bookie registration node doesn't exists!",
bkRegNode1);
// simulating bookie restart, on restart bookie will create new
// zkclient and doing the registration.
ZooKeeperClient newZk = createNewZKClient();
- b.zk = newZk;
+ RegistrationManager newRm = new ZKRegistrationManager();
+ newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ b.registrationManager = newRm;
try {
// deleting the znode, so that the bookie registration should
@@ -228,7 +253,11 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
+ conf.getBookiePort();
MockBookie b = new MockBookie(conf);
- b.zk = zkc;
+
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ b.registrationManager = rm;
+
b.testRegisterBookie(conf);
Stat bkRegNode1 = zkc.exists(bkRegPath, false);
Assert.assertNotNull("Bookie registration node doesn't exists!",
@@ -237,14 +266,19 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
// simulating bookie restart, on restart bookie will create new
// zkclient and doing the registration.
ZooKeeperClient newzk = createNewZKClient();
- b.zk = newzk;
+ RegistrationManager newRm = new ZKRegistrationManager();
+ newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ b.registrationManager = newRm;
try {
b.testRegisterBookie(conf);
fail("Should throw NodeExistsException as the znode is not getting expired");
} catch (IOException e) {
- Throwable t = e.getCause();
- if (t instanceof KeeperException) {
- KeeperException ke = (KeeperException) t;
+ Throwable t1 = e.getCause(); // BookieException.MetadataStoreException
+ Throwable t2 = t1.getCause(); // IOException
+ Throwable t3 = t2.getCause(); // KeeperException.NodeExistsException
+
+ if (t3 instanceof KeeperException) {
+ KeeperException ke = (KeeperException) t3;
Assert.assertTrue("ErrorCode:" + ke.code()
+ ", Registration node doesn't exists",
ke.code() == KeeperException.Code.NODEEXISTS);
@@ -263,6 +297,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
throw e;
} finally {
newzk.close();
+ newRm.close();
}
}
@@ -280,11 +315,17 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
tmpDir.getPath()).setLedgerDirNames(
new String[] { tmpDir.getPath() });
BookieServer bs1 = new BookieServer(conf);
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ bs1.getBookie().setRegistrationManager(rm);
bs1.start();
// starting bk server with same conf
try {
BookieServer bs2 = new BookieServer(conf);
+ RegistrationManager newRm = new ZKRegistrationManager();
+ newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ bs2.getBookie().registrationManager = newRm;
bs2.start();
fail("Should throw BindException, as the bk server is already running!");
} catch (BindException e) {
@@ -313,6 +354,9 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
ServerConfiguration conf1 = new ServerConfiguration();
conf1.addConfiguration(conf);
BookieServer bs1 = new BookieServer(conf1);
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ bs1.getBookie().registrationManager = rm;
bs1.start();
assertFalse(0 == conf1.getBookiePort());
@@ -320,6 +364,9 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
ServerConfiguration conf2 = new ServerConfiguration();
conf2.addConfiguration(conf);
BookieServer bs2 = new BookieServer(conf2);
+ RegistrationManager newRm = new ZKRegistrationManager();
+ newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ bs2.getBookie().registrationManager = newRm;
bs2.start();
assertFalse(0 == conf2.getBookiePort());
@@ -343,7 +390,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
try {
new Bookie(conf);
fail("Should throw ConnectionLossException as ZKServer is not running!");
- } catch (KeeperException.ConnectionLossException e) {
+ } catch (BookieException.MetadataStoreException e) {
// expected behaviour
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
index f622448..e817776 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
@@ -28,16 +28,21 @@ import static org.apache.bookkeeper.bookie.UpgradeTest.newV2LedgerDirectory;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.IOUtils;
@@ -45,6 +50,7 @@ import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.io.FileUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -68,6 +74,24 @@ public class CookieTest extends BookKeeperClusterTestCase {
return d.getPath();
}
+ RegistrationManager rm;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ rm = new ZKRegistrationManager();
+ baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+ rm.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if(rm != null) {
+ rm.close();
+ }
+ }
+
/**
* Test starting bookie with clean state.
*/
@@ -98,7 +122,7 @@ public class CookieTest extends BookKeeperClusterTestCase {
.setBookiePort(bookiePort);
Cookie.Builder cookieBuilder = Cookie.generateCookie(conf1);
Cookie c = cookieBuilder.build();
- c.writeToZooKeeper(zkc, conf1, Version.NEW);
+ c.writeToRegistrationManager(rm, conf1, Version.NEW);
String journalDir = newDirectory();
String ledgerDir = newDirectory();
@@ -590,15 +614,15 @@ public class CookieTest extends BookKeeperClusterTestCase {
Bookie b = new Bookie(conf); // should work fine
b.start();
b.shutdown();
- Versioned<Cookie> zkCookie = Cookie.readFromZooKeeper(zkc, conf);
+ Versioned<Cookie> zkCookie = Cookie.readFromRegistrationManager(rm, conf);
Version version1 = zkCookie.getVersion();
Assert.assertTrue("Invalid type expected ZkVersion type",
version1 instanceof LongVersion);
LongVersion zkVersion1 = (LongVersion) version1;
Cookie cookie = zkCookie.getValue();
- cookie.writeToZooKeeper(zkc, conf, version1);
+ cookie.writeToRegistrationManager(rm, conf, version1);
- zkCookie = Cookie.readFromZooKeeper(zkc, conf);
+ zkCookie = Cookie.readFromRegistrationManager(rm, conf);
Version version2 = zkCookie.getVersion();
Assert.assertTrue("Invalid type expected ZkVersion type",
version2 instanceof LongVersion);
@@ -620,8 +644,8 @@ public class CookieTest extends BookKeeperClusterTestCase {
Bookie b = new Bookie(conf); // should work fine
b.start();
b.shutdown();
- Versioned<Cookie> zkCookie = Cookie.readFromZooKeeper(zkc, conf);
+ Versioned<Cookie> zkCookie = Cookie.readFromRegistrationManager(rm, conf);
Cookie cookie = zkCookie.getValue();
- cookie.deleteFromZooKeeper(zkc, conf, zkCookie.getVersion());
+ cookie.deleteFromRegistrationManager(rm, conf, zkCookie.getVersion());
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
index f5b646e..c11fbca 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
@@ -20,17 +20,21 @@
*/
package org.apache.bookkeeper.bookie;
+import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
+
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Assert;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
@@ -44,10 +48,29 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
private final static Logger LOG = LoggerFactory.getLogger(UpdateCookieCmdTest.class);
+ RegistrationManager rm;
+
public UpdateCookieCmdTest() {
super(1);
}
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ LOG.info("setUp ZKRegistrationManager");
+ rm = new ZKRegistrationManager();
+ baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+ rm.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if(rm != null) {
+ rm.close();
+ }
+ }
+
/**
* updatecookie to hostname
*/
@@ -109,12 +132,12 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
// creates cookie with ipaddress
ServerConfiguration conf = bsConfs.get(0);
conf.setUseHostNameAsBookieID(true); // sets to hostname
- Cookie cookie = Cookie.readFromZooKeeper(zkc, conf).getValue();
+ Cookie cookie = Cookie.readFromRegistrationManager(rm, conf).getValue();
Cookie.Builder cookieBuilder = Cookie.newBuilder(cookie);
conf.setUseHostNameAsBookieID(false); // sets to hostname
final String newBookieHost = Bookie.getBookieAddress(conf).toString();
cookieBuilder.setBookieHost(newBookieHost);
- cookieBuilder.build().writeToZooKeeper(zkc, conf, Version.NEW);
+ cookieBuilder.build().writeToRegistrationManager(rm, conf, Version.NEW);
verifyCookieInZooKeeper(conf, 2);
// again issue hostname cmd
@@ -125,7 +148,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
Assert.assertEquals("Failed to return the error code!", 0, bkShell.run(argv));
conf.setUseHostNameAsBookieID(true);
- cookie = Cookie.readFromZooKeeper(zkc, conf).getValue();
+ cookie = Cookie.readFromRegistrationManager(rm, conf).getValue();
Assert.assertFalse("Cookie has created with IP!", cookie.isBookieHostCreatedFromIp());
// ensure the old cookie is deleted
verifyCookieInZooKeeper(conf, 1);
@@ -148,7 +171,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
BookieServer bks = bs.get(0);
bks.shutdown();
- String zkCookiePath = Cookie.getZkPath(conf);
+ String zkCookiePath = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE + "/" + Bookie.getBookieAddress(conf);
Assert.assertNotNull("Cookie path doesn't still exists!", zkc.exists(zkCookiePath, false));
zkc.delete(zkCookiePath, -1);
Assert.assertNull("Cookie path still exists!", zkc.exists(zkCookiePath, false));
@@ -163,7 +186,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
private void verifyCookieInZooKeeper(ServerConfiguration conf, int expectedCount) throws KeeperException,
InterruptedException {
List<String> cookies;
- String bookieCookiePath1 = conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.COOKIE_NODE;
+ String bookieCookiePath1 = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE;
cookies = zkc.getChildren(bookieCookiePath1, false);
Assert.assertEquals("Wrongly updated the cookie!", expectedCount, cookies.size());
}
@@ -174,7 +197,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
bks.shutdown();
conf.setUseHostNameAsBookieID(!useHostNameAsBookieID);
- Cookie cookie = Cookie.readFromZooKeeper(zkc, conf).getValue();
+ Cookie cookie = Cookie.readFromRegistrationManager(rm, conf).getValue();
final boolean previousBookieID = cookie.isBookieHostCreatedFromIp();
Assert.assertEquals("Wrong cookie!", useHostNameAsBookieID, previousBookieID);
@@ -187,7 +210,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
Assert.assertEquals("Failed to return exit code!", 0, bkShell.run(argv));
newconf.setUseHostNameAsBookieID(useHostNameAsBookieID);
- cookie = Cookie.readFromZooKeeper(zkc, newconf).getValue();
+ cookie = Cookie.readFromRegistrationManager(rm, newconf).getValue();
Assert.assertEquals("Wrongly updated cookie!", previousBookieID, !cookie.isBookieHostCreatedFromIp());
Assert.assertEquals("Wrongly updated cookie!", useHostNameAsBookieID, !cookie.isBookieHostCreatedFromIp());
verifyCookieInZooKeeper(newconf, 1);
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].