You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/12/12 14:31:53 UTC
[bookkeeper] 01/02: BP-38 new API for Bookie discovery
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch fix/endpoint-discovery-impl
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 0845fdb7d5579bd7a538741fcf8ec9bfaca00277
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Dec 12 14:54:33 2019 +0100
BP-38 new API for Bookie discovery
---
.../bookkeeper/discover/BookieServiceInfo.java | 42 ++++++++++++++++++++++
.../bookkeeper/discover/RegistrationClient.java | 11 ++++++
.../bookkeeper/discover/RegistrationManager.java | 3 +-
.../bookkeeper/discover/ZKRegistrationClient.java | 20 +++++++++++
.../bookkeeper/discover/ZKRegistrationManager.java | 36 ++++++++++++++-----
5 files changed, 103 insertions(+), 9 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
new file mode 100644
index 0000000..0ce6add
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import java.util.Iterator;
+
+/**
+ * Information about services exposed by a Bookie.
+ */
+public interface BookieServiceInfo {
+
+ /**
+ * List all available entries.
+ * Remove operation is not supported.
+ * @return
+ */
+ Iterator<String> keys();
+
+ /**
+ * Return an entry, if the entry is not present the default value will be returned.
+ * @param key the key
+ * @param defaultValue the default value
+ * @return the current mapping, if there is no mapping for key the defaultValue will be returned
+ */
+ String get(String key, String defaultValue);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
index 4056329..16ee421 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -64,6 +64,17 @@ public interface RegistrationClient extends AutoCloseable {
* @return a future represents the list of readonly bookies.
*/
CompletableFuture<Versioned<Set<BookieSocketAddress>>> getReadOnlyBookies();
+
+ /**
+ * Get detailed information about the services exposed by a Bookie.
+ * For old bookies it is expected to return an empty BookieServiceInfo structure.
+ *
+ * @param bookieId this is the id of the bookie, it can be computed from a {@link BookieSocketAddress}
+ * @return a future represents the available information.
+ *
+ * @since 4.11
+ */
+ CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId);
/**
* Watch the changes of bookies.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
index 3d357d4..e5b0f6a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
@@ -59,9 +59,10 @@ public interface RegistrationManager extends AutoCloseable {
*
* @param bookieId bookie id
* @param readOnly whether to register it as writable or readonly
+ * @param serviceInfo information about services exposed by the Bookie
* @throws BookieException when fail to register a bookie.
*/
- void registerBookie(String bookieId, boolean readOnly) throws BookieException;
+ void registerBookie(String bookieId, boolean readOnly, BookieServiceInfo serviceInfo) throws BookieException;
/**
* Unregistering the bookie server as <i>bookieId</i>.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index 6629f0c..9586dc4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -45,11 +45,13 @@ import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Version.Occurred;
import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
/**
* ZooKeeper based {@link RegistrationClient}.
@@ -212,6 +214,24 @@ public class ZKRegistrationClient implements RegistrationClient {
return getChildren(bookieReadonlyRegistrationPath, null);
}
+ @Override
+ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId) {
+ String pathAsWritable = bookieRegistrationPath + "/" + bookieId;
+ CompletableFuture<Versioned<BookieServiceInfo>> res = new CompletableFuture<>();
+ zk.getData(pathAsWritable, false, new AsyncCallback.DataCallback() {
+ @Override
+ public void processResult(int rc, String string, Object o, byte[] bytes, Stat stat) {
+ if (Code.OK == rc) {
+ BookieServiceInfo bookieServiceInfo = deserializeBookieService
+ res.complete(new Versioned<>())
+ }
+ }
+ }, null);
+ return res;
+ }
+
+
+
private CompletableFuture<Versioned<Set<BookieSocketAddress>>> getChildren(String regPath, Watcher watcher) {
CompletableFuture<Versioned<Set<BookieSocketAddress>>> future = FutureUtils.createFuture();
zk.getChildren(regPath, watcher, (rc, path, ctx, children, stat) -> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index a2a3e7c..5255ca2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -18,6 +18,8 @@
package org.apache.bookkeeper.discover;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
@@ -29,11 +31,15 @@ import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
@@ -213,21 +219,35 @@ public class ZKRegistrationManager implements RegistrationManager {
}
@Override
- public void registerBookie(String bookieId, boolean readOnly) throws BookieException {
+ public void registerBookie(String bookieId, boolean readOnly, BookieServiceInfo bookieServiceInfo) throws BookieException {
if (!readOnly) {
String regPath = bookieRegistrationPath + "/" + bookieId;
- doRegisterBookie(regPath);
+ doRegisterBookie(regPath, bookieServiceInfo);
} else {
- doRegisterReadOnlyBookie(bookieId);
+ doRegisterReadOnlyBookie(bookieId, bookieServiceInfo);
}
}
- private void doRegisterBookie(String regPath) throws BookieException {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
+ try {
+ Map<String, String> map = new HashMap<>();
+ bookieServiceInfo.keys().forEachRemaining(key -> {
+ map.put(key, bookieServiceInfo.get(key, ""));
+ });
+ return MAPPER.writeValueAsBytes(map);
+ } catch (JsonProcessingException ex) {
+ log.error("Cannot serialize bookieServiceInfo {}", bookieServiceInfo, ex);
+ return new byte[0];
+ }
+ }
+
+ private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException {
// ZK ephemeral node for this Bookie.
try {
if (!checkRegNodeAndWaitExpired(regPath)) {
// Create the ZK ephemeral node for this Bookie.
- zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
+ zk.create(regPath, serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL);
zkRegManagerInitialized = true;
}
} catch (KeeperException ke) {
@@ -248,11 +268,11 @@ public class ZKRegistrationManager implements RegistrationManager {
}
}
- private void doRegisterReadOnlyBookie(String bookieId) throws BookieException {
+ private void doRegisterReadOnlyBookie(String bookieId, BookieServiceInfo bookieServiceInfo) throws BookieException {
try {
if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
try {
- zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
+ zk.create(this.bookieReadonlyRegistrationPath, serializeBookieServiceInfo(bookieServiceInfo),
zkAcls, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// this node is just now created by someone.
@@ -260,7 +280,7 @@ public class ZKRegistrationManager implements RegistrationManager {
}
String regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
- doRegisterBookie(regPath);
+ doRegisterBookie(regPath, bookieServiceInfo);
// clear the write state
regPath = bookieRegistrationPath + "/" + bookieId;
try {