You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/12/12 14:51:36 UTC
[bookkeeper] 01/02: first implementation
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch fix/endpoint-discovery-impl
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 2b6b4e9b490f9166493b15b31e985e3ee1fef076
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Dec 12 15:31:31 2019 +0100
first implementation
---
.../bookkeeper/discover/BookieServiceInfo.java | 18 ++++++-
.../bookkeeper/discover/RegistrationClient.java | 8 ++-
.../bookkeeper/discover/ZKRegistrationClient.java | 63 ++++++++++++++++++----
.../bookkeeper/discover/ZKRegistrationManager.java | 7 +--
4 files changed, 81 insertions(+), 15 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
index 0ce6add..2dc0ad8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.discover;
+import java.util.Collections;
import java.util.Iterator;
/**
@@ -26,9 +27,24 @@ import java.util.Iterator;
public interface BookieServiceInfo {
/**
+ * Default empty implementation.
+ */
+ static BookieServiceInfo EMPTY = new BookieServiceInfo() {
+ @Override
+ public Iterator<String> keys() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public String get(String key, String defaultValue) {
+ return defaultValue;
+ }
+ };
+
+ /**
* List all available entries.
* Remove operation is not supported.
- * @return
+ * @return
*/
Iterator<String> keys();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
index 16ee421..2fbb47a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -22,7 +22,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
/**
@@ -64,7 +66,7 @@ public interface RegistrationClient extends AutoCloseable {
* @return a future represents the list of readonly bookies.
*/
CompletableFuture<Versioned<Set<BookieSocketAddress>>> getReadOnlyBookies();
-
+
/**
* Get detailed information about the services exposed by a Bookie.
* For old bookies it is expected to return an empty BookieServiceInfo structure.
@@ -74,7 +76,9 @@ public interface RegistrationClient extends AutoCloseable {
*
* @since 4.11
*/
- CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId);
+ default CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId) {
+ return FutureUtils.value(new Versioned<>(BookieServiceInfo.EMPTY, new LongVersion(-1)));
+ }
/**
* Watch the changes of bookies.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index 9586dc4..c7cfd69 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -18,14 +18,19 @@
package org.apache.bookkeeper.discover;
+import com.fasterxml.jackson.databind.ObjectMapper;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import com.google.common.collect.Sets;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -46,6 +51,8 @@ import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Version.Occurred;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import static org.apache.zookeeper.KeeperException.Code.OK;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -59,6 +66,8 @@ import org.apache.zookeeper.data.Stat;
@Slf4j
public class ZKRegistrationClient implements RegistrationClient {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
static final int ZK_CONNECT_BACKOFF_MS = 200;
class WatchTask
@@ -218,19 +227,55 @@ public class ZKRegistrationClient implements RegistrationClient {
public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId) {
String pathAsWritable = bookieRegistrationPath + "/" + bookieId;
CompletableFuture<Versioned<BookieServiceInfo>> res = new CompletableFuture<>();
- zk.getData(pathAsWritable, false, new AsyncCallback.DataCallback() {
- @Override
- public void processResult(int rc, String string, Object o, byte[] bytes, Stat stat) {
- if (Code.OK == rc) {
- BookieServiceInfo bookieServiceInfo = deserializeBookieService
- res.complete(new Versioned<>())
- }
+ zk.getData(pathAsWritable, false, (int rc, String path, Object o, byte[] bytes, Stat stat) -> {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ BookieServiceInfo bookieServiceInfo = deserializeBookieService(bytes);
+ res.complete(new Versioned<>(bookieServiceInfo, new LongVersion(stat.getCversion())));
+ } else if (KeeperException.Code.NONODE.intValue() == rc) {
+ // not found, looking for a readonly bookie
+ String pathAsReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
+ zk.getData(pathAsReadonly, false, (int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
+ if (KeeperException.Code.OK.intValue() == rc2) {
+ BookieServiceInfo bookieServiceInfo = deserializeBookieService(bytes2);
+ res.complete(new Versioned<>(bookieServiceInfo, new LongVersion(stat2.getCversion())));
+ } else if (KeeperException.Code.NONODE.intValue() == rc2) {
+ // not found as readonly, the bookie is offline
+ // return an empty BookieServiceInfoStructure
+ res.complete(new Versioned<>(deserializeBookieService(null), new LongVersion(0)));
+ } else {
+ res.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc2), path2));
+ }
+ }, null);
+ } else {
+ res.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
}
}, null);
return res;
}
-
-
+
+ private static BookieServiceInfo deserializeBookieService(byte[] bookieServiceInfo) {
+ Map<String, String> map = Collections.emptyMap();
+ if (bookieServiceInfo != null && bookieServiceInfo.length > 0) {
+ try {
+ map = Collections.unmodifiableMap(MAPPER.readValue(bookieServiceInfo, Map.class));
+ } catch (IOException err) {
+ log.error("Cannot deserialize bookieServiceInfo from " + new String(bookieServiceInfo, StandardCharsets.US_ASCII), err);
+ }
+ }
+ final Map<String, String> mapFinal = map;
+ return new BookieServiceInfo() {
+ @Override
+ public Iterator<String> keys() {
+ return mapFinal.keySet().iterator();
+ }
+
+ @Override
+ public String get(String key, String defaultValue) {
+ return mapFinal.getOrDefault(key, defaultValue);
+ }
+ };
+ }
+
private CompletableFuture<Versioned<Set<BookieSocketAddress>>> getChildren(String regPath, Watcher watcher) {
CompletableFuture<Versioned<Set<BookieSocketAddress>>> future = FutureUtils.createFuture();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index 5255ca2..fe4f3e8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -83,6 +83,8 @@ import org.apache.zookeeper.data.Stat;
@Slf4j
public class ZKRegistrationManager implements RegistrationManager {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> {
if (cause instanceof BKException) {
log.error("Failed to get bookie list : ", cause);
@@ -228,8 +230,7 @@ public class ZKRegistrationManager implements RegistrationManager {
}
}
- private static final ObjectMapper MAPPER = new ObjectMapper();
- private byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
+ private static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
try {
Map<String, String> map = new HashMap<>();
bookieServiceInfo.keys().forEachRemaining(key -> {
@@ -241,7 +242,7 @@ public class ZKRegistrationManager implements RegistrationManager {
return new byte[0];
}
}
-
+
private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException {
// ZK ephemeral node for this Bookie.
try {