You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/12/12 14:31:54 UTC

[bookkeeper] 02/02: first implementation

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch fix/endpoint-discovery-impl
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 2b6b4e9b490f9166493b15b31e985e3ee1fef076
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Dec 12 15:31:31 2019 +0100

    first implementation
---
 .../bookkeeper/discover/BookieServiceInfo.java     | 18 ++++++-
 .../bookkeeper/discover/RegistrationClient.java    |  8 ++-
 .../bookkeeper/discover/ZKRegistrationClient.java  | 63 ++++++++++++++++++----
 .../bookkeeper/discover/ZKRegistrationManager.java |  7 +--
 4 files changed, 81 insertions(+), 15 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
index 0ce6add..2dc0ad8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.bookkeeper.discover;
 
+import java.util.Collections;
 import java.util.Iterator;
 
 /**
@@ -26,9 +27,24 @@ import java.util.Iterator;
 public interface BookieServiceInfo {
 
     /**
+     * Default empty implementation.
+     */
+    static BookieServiceInfo EMPTY = new BookieServiceInfo() {
+        @Override
+        public Iterator<String> keys() {
+            return Collections.emptyIterator();
+        }
+
+        @Override
+        public String get(String key, String defaultValue) {
+            return defaultValue;
+        }
+    };
+
+    /**
      * List all available entries.
      * Remove operation is not supported.
-     * @return 
+     * @return
      */
     Iterator<String> keys();
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
index 16ee421..2fbb47a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -22,7 +22,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 
 /**
@@ -64,7 +66,7 @@ public interface RegistrationClient extends AutoCloseable {
      * @return a future represents the list of readonly bookies.
      */
     CompletableFuture<Versioned<Set<BookieSocketAddress>>> getReadOnlyBookies();
-    
+
     /**
      * Get detailed information about the services exposed by a Bookie.
      * For old bookies it is expected to return an empty BookieServiceInfo structure.
@@ -74,7 +76,9 @@ public interface RegistrationClient extends AutoCloseable {
      *
      * @since 4.11
      */
-    CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId);
+    default CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId) {
+        return FutureUtils.value(new Versioned<>(BookieServiceInfo.EMPTY, new LongVersion(-1)));
+    }
 
     /**
      * Watch the changes of bookies.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index 9586dc4..c7cfd69 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -18,14 +18,19 @@
 
 package org.apache.bookkeeper.discover;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -46,6 +51,8 @@ import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Version.Occurred;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import static org.apache.zookeeper.KeeperException.Code.OK;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -59,6 +66,8 @@ import org.apache.zookeeper.data.Stat;
 @Slf4j
 public class ZKRegistrationClient implements RegistrationClient {
 
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
     static final int ZK_CONNECT_BACKOFF_MS = 200;
 
     class WatchTask
@@ -218,19 +227,55 @@ public class ZKRegistrationClient implements RegistrationClient {
     public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId) {
         String pathAsWritable = bookieRegistrationPath + "/" + bookieId;
         CompletableFuture<Versioned<BookieServiceInfo>> res = new CompletableFuture<>();
-        zk.getData(pathAsWritable, false, new AsyncCallback.DataCallback() {
-            @Override
-            public void processResult(int rc, String string, Object o, byte[] bytes, Stat stat) {
-                if (Code.OK == rc) {
-                    BookieServiceInfo bookieServiceInfo = deserializeBookieService
-                    res.complete(new Versioned<>())
-                }
+        zk.getData(pathAsWritable, false, (int rc, String path, Object o, byte[] bytes, Stat stat) -> {
+            if (KeeperException.Code.OK.intValue() == rc) {
+                BookieServiceInfo bookieServiceInfo = deserializeBookieService(bytes);
+                res.complete(new Versioned<>(bookieServiceInfo, new LongVersion(stat.getCversion())));
+            } else if (KeeperException.Code.NONODE.intValue() == rc) {
+                // not found, looking for a readonly bookie
+                String pathAsReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
+                zk.getData(pathAsReadonly, false, (int rc2, String path2, Object o2, byte[] bytes2, Stat stat2) -> {
+                    if (KeeperException.Code.OK.intValue() == rc2) {
+                        BookieServiceInfo bookieServiceInfo = deserializeBookieService(bytes2);
+                        res.complete(new Versioned<>(bookieServiceInfo, new LongVersion(stat2.getCversion())));
+                    } else if (KeeperException.Code.NONODE.intValue() == rc2) {
+                        // not found as readonly, the bookie is offline
+                        // return an empty BookieServiceInfoStructure
+                        res.complete(new Versioned<>(deserializeBookieService(null), new LongVersion(0)));
+                    } else {
+                        res.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc2), path2));
+                    }
+                }, null);
+            } else {
+                res.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
             }
         }, null);
         return res;
     }
-    
-    
+
+    private static BookieServiceInfo deserializeBookieService(byte[] bookieServiceInfo) {
+        Map<String, String> map = Collections.emptyMap();
+        if (bookieServiceInfo != null && bookieServiceInfo.length > 0) {
+            try {
+                map = Collections.unmodifiableMap(MAPPER.readValue(bookieServiceInfo, Map.class));
+            } catch (IOException err) {
+                log.error("Cannot deserialize bookieServiceInfo from " + new String(bookieServiceInfo, StandardCharsets.US_ASCII), err);
+            }
+        }
+        final Map<String, String> mapFinal = map;
+        return new BookieServiceInfo() {
+            @Override
+            public Iterator<String> keys() {
+                return mapFinal.keySet().iterator();
+            }
+
+            @Override
+            public String get(String key, String defaultValue) {
+                return mapFinal.getOrDefault(key, defaultValue);
+            }
+        };
+    }
+
 
     private CompletableFuture<Versioned<Set<BookieSocketAddress>>> getChildren(String regPath, Watcher watcher) {
         CompletableFuture<Versioned<Set<BookieSocketAddress>>> future = FutureUtils.createFuture();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index 5255ca2..fe4f3e8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -83,6 +83,8 @@ import org.apache.zookeeper.data.Stat;
 @Slf4j
 public class ZKRegistrationManager implements RegistrationManager {
 
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
     private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> {
         if (cause instanceof BKException) {
             log.error("Failed to get bookie list : ", cause);
@@ -228,8 +230,7 @@ public class ZKRegistrationManager implements RegistrationManager {
         }
     }
 
-    private static final ObjectMapper MAPPER = new ObjectMapper();
-    private byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
+    private static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
         try {
             Map<String, String> map = new HashMap<>();
             bookieServiceInfo.keys().forEachRemaining(key -> {
@@ -241,7 +242,7 @@ public class ZKRegistrationManager implements RegistrationManager {
             return new byte[0];
         }
     }
-    
+
     private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException {
         // ZK ephemeral node for this Bookie.
         try {