You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/12/12 14:31:53 UTC

[bookkeeper] 01/02: BP-38 new API for Bookie discovery

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch fix/endpoint-discovery-impl
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 0845fdb7d5579bd7a538741fcf8ec9bfaca00277
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Dec 12 14:54:33 2019 +0100

    BP-38 new API for Bookie discovery
---
 .../bookkeeper/discover/BookieServiceInfo.java     | 42 ++++++++++++++++++++++
 .../bookkeeper/discover/RegistrationClient.java    | 11 ++++++
 .../bookkeeper/discover/RegistrationManager.java   |  3 +-
 .../bookkeeper/discover/ZKRegistrationClient.java  | 20 +++++++++++
 .../bookkeeper/discover/ZKRegistrationManager.java | 36 ++++++++++++++-----
 5 files changed, 103 insertions(+), 9 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
new file mode 100644
index 0000000..0ce6add
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/BookieServiceInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import java.util.Iterator;
+
+/**
+ * Information about services exposed by a Bookie.
+ */
+public interface BookieServiceInfo {
+
+    /**
+     * List all available entries.
+     * Remove operation is not supported.
+     * @return 
+     */
+    Iterator<String> keys();
+
+    /**
+     * Return an entry, if the entry is not present the default value will be returned.
+     * @param key the key
+     * @param defaultValue the default value
+     * @return the current mapping, if there is no mapping for key the defaultValue will be returned
+     */
+    String get(String key, String defaultValue);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
index 4056329..16ee421 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -64,6 +64,17 @@ public interface RegistrationClient extends AutoCloseable {
      * @return a future represents the list of readonly bookies.
      */
     CompletableFuture<Versioned<Set<BookieSocketAddress>>> getReadOnlyBookies();
+    
+    /**
+     * Get detailed information about the services exposed by a Bookie.
+     * For old bookies it is expected to return an empty BookieServiceInfo structure.
+     *
+     * @param bookieId this is the id of the bookie, it can be computed from a {@link BookieSocketAddress}
+     * @return a future represents the available information.
+     *
+     * @since 4.11
+     */
+    CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId);
 
     /**
      * Watch the changes of bookies.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
index 3d357d4..e5b0f6a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
@@ -59,9 +59,10 @@ public interface RegistrationManager extends AutoCloseable {
      *
      * @param bookieId bookie id
      * @param readOnly whether to register it as writable or readonly
+     * @param serviceInfo information about services exposed by the Bookie
      * @throws BookieException when fail to register a bookie.
      */
-    void registerBookie(String bookieId, boolean readOnly) throws BookieException;
+    void registerBookie(String bookieId, boolean readOnly, BookieServiceInfo serviceInfo) throws BookieException;
 
     /**
      * Unregistering the bookie server as <i>bookieId</i>.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index 6629f0c..9586dc4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -45,11 +45,13 @@ import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Version.Occurred;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * ZooKeeper based {@link RegistrationClient}.
@@ -212,6 +214,24 @@ public class ZKRegistrationClient implements RegistrationClient {
         return getChildren(bookieReadonlyRegistrationPath, null);
     }
 
+    @Override
+    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(String bookieId) {
+        String pathAsWritable = bookieRegistrationPath + "/" + bookieId;
+        CompletableFuture<Versioned<BookieServiceInfo>> res = new CompletableFuture<>();
+        zk.getData(pathAsWritable, false, new AsyncCallback.DataCallback() {
+            @Override
+            public void processResult(int rc, String string, Object o, byte[] bytes, Stat stat) {
+                if (Code.OK == rc) {
+                    BookieServiceInfo bookieServiceInfo = deserializeBookieService
+                    res.complete(new Versioned<>())
+                }
+            }
+        }, null);
+        return res;
+    }
+    
+    
+
     private CompletableFuture<Versioned<Set<BookieSocketAddress>>> getChildren(String regPath, Watcher watcher) {
         CompletableFuture<Versioned<Set<BookieSocketAddress>>> future = FutureUtils.createFuture();
         zk.getChildren(regPath, watcher, (rc, path, ctx, children, stat) -> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index a2a3e7c..5255ca2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.bookkeeper.discover;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
@@ -29,11 +31,15 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
@@ -213,21 +219,35 @@ public class ZKRegistrationManager implements RegistrationManager {
     }
 
     @Override
-    public void registerBookie(String bookieId, boolean readOnly) throws BookieException {
+    public void registerBookie(String bookieId, boolean readOnly, BookieServiceInfo bookieServiceInfo) throws BookieException {
         if (!readOnly) {
             String regPath = bookieRegistrationPath + "/" + bookieId;
-            doRegisterBookie(regPath);
+            doRegisterBookie(regPath, bookieServiceInfo);
         } else {
-            doRegisterReadOnlyBookie(bookieId);
+            doRegisterReadOnlyBookie(bookieId, bookieServiceInfo);
         }
     }
 
-    private void doRegisterBookie(String regPath) throws BookieException {
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
+        try {
+            Map<String, String> map = new HashMap<>();
+            bookieServiceInfo.keys().forEachRemaining(key -> {
+                map.put(key, bookieServiceInfo.get(key, ""));
+            });
+            return MAPPER.writeValueAsBytes(map);
+        } catch (JsonProcessingException ex) {
+            log.error("Cannot serialize bookieServiceInfo {}", bookieServiceInfo, ex);
+            return new byte[0];
+        }
+    }
+    
+    private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException {
         // ZK ephemeral node for this Bookie.
         try {
             if (!checkRegNodeAndWaitExpired(regPath)) {
                 // Create the ZK ephemeral node for this Bookie.
-                zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
+                zk.create(regPath, serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL);
                 zkRegManagerInitialized = true;
             }
         } catch (KeeperException ke) {
@@ -248,11 +268,11 @@ public class ZKRegistrationManager implements RegistrationManager {
         }
     }
 
-    private void doRegisterReadOnlyBookie(String bookieId) throws BookieException {
+    private void doRegisterReadOnlyBookie(String bookieId, BookieServiceInfo bookieServiceInfo) throws BookieException {
         try {
             if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
                 try {
-                    zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
+                    zk.create(this.bookieReadonlyRegistrationPath, serializeBookieServiceInfo(bookieServiceInfo),
                               zkAcls, CreateMode.PERSISTENT);
                 } catch (NodeExistsException e) {
                     // this node is just now created by someone.
@@ -260,7 +280,7 @@ public class ZKRegistrationManager implements RegistrationManager {
             }
 
             String regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
-            doRegisterBookie(regPath);
+            doRegisterBookie(regPath, bookieServiceInfo);
             // clear the write state
             regPath = bookieRegistrationPath + "/" + bookieId;
             try {