You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/10/15 16:24:36 UTC

[pulsar] branch master updated: Make BookieId work with PulsarRegistrationDriver (second take) (#17922)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f5eeb0c94 Make BookieId work with PulsarRegistrationDriver (second take) (#17922)
09f5eeb0c94 is described below

commit 09f5eeb0c946ee890483e087f802e30a2a2b60ab
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sat Oct 15 18:24:20 2022 +0200

    Make BookieId work with PulsarRegistrationDriver (second take) (#17922)
    
    * Make BookieId work with PulsarRegistrationDriver (#17762)
    
    * Make BookieId work with PulsarRegistrationDriver
    
    * Switch to MetadataCache
    
    * checkstyle
    
    * Do not execute lookup on MetadataCache in the getBookieServiceInfo caller thread
---
 .../bookkeeper/BookieServiceInfoSerde.java         |  55 +++++++++-
 .../bookkeeper/PulsarRegistrationClient.java       | 119 ++++++++++++++++++++-
 .../bookkeeper/PulsarRegistrationClientTest.java   |  62 +++++++++++
 3 files changed, 230 insertions(+), 6 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
index 78a33179e76..b7e3024b637 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.discover.BookieServiceInfo;
+import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
 import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.Stat;
@@ -63,7 +64,57 @@ public class BookieServiceInfoSerde implements MetadataSerde<BookieServiceInfo>
     }
 
     @Override
-    public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException {
-        return null;
+    public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException {
+        // see https://github.com/apache/bookkeeper/blob/
+        // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
+        // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311
+        String bookieId = extractBookiedIdFromPath(path);
+        if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
+            return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId);
+        }
+
+        BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo);
+        BookieServiceInfo bsi = new BookieServiceInfo();
+        List<BookieServiceInfo.Endpoint> endpoints = builder.getEndpointsList().stream()
+                .map(e -> {
+                    BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+                    endpoint.setId(e.getId());
+                    endpoint.setPort(e.getPort());
+                    endpoint.setHost(e.getHost());
+                    endpoint.setProtocol(e.getProtocol());
+                    endpoint.setAuth(e.getAuthList());
+                    endpoint.setExtensions(e.getExtensionsList());
+                    return endpoint;
+                })
+                .collect(Collectors.toList());
+
+        bsi.setEndpoints(endpoints);
+        bsi.setProperties(builder.getPropertiesMap());
+
+        return bsi;
+
+    }
+
+    /**
+     * Extract the BookieId
+     * The path should look like /ledgers/available/bookieId
+     * or /ledgers/available/readonly/bookieId.
+     * But the prefix depends on the configuration.
+     * @param path
+     * @return the bookieId
+     */
+    private static String extractBookiedIdFromPath(String path) throws IOException {
+        // https://github.com/apache/bookkeeper/blob/
+        // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
+        // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258
+        if (path == null) {
+            path = "";
+        }
+        int last = path.lastIndexOf("/");
+        if (last >= 0) {
+            return path.substring(last + 1);
+        } else {
+            throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node");
+        }
     }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 52b50e3ea4b..f314c0efaf0 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -22,22 +22,33 @@ import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 
+@Slf4j
 public class PulsarRegistrationClient implements RegistrationClient {
 
     private final MetadataStore store;
@@ -47,14 +58,18 @@ public class PulsarRegistrationClient implements RegistrationClient {
     private final String bookieAllRegistrationPath;
     private final String bookieReadonlyRegistrationPath;
 
+    private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
+                                                                                    new ConcurrentHashMap();
     private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
     private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
+    private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
     private final ScheduledExecutorService executor;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
         this.store = store;
         this.ledgersRootPath = ledgersRootPath;
+        this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
 
         // Following Bookie Network Address Changes is an expensive operation
         // as it requires additional ZooKeeper watches
@@ -99,7 +114,25 @@ public class PulsarRegistrationClient implements RegistrationClient {
 
     private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {
         return store.getChildren(path)
-                .thenApply(PulsarRegistrationClient::convertToBookieAddresses)
+                .thenComposeAsync(children -> {
+                    Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
+                    Set<BookieId> bookies = convertToBookieAddresses(children);
+                    List<CompletableFuture<Versioned<BookieServiceInfo>>> bookieInfoUpdated =
+                            new ArrayList<>(bookies.size());
+                    for (BookieId id : bookies) {
+                        // update the cache for new bookies
+                        if (!bookieServiceInfoCache.containsKey(id)) {
+                            bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
+                        }
+                    }
+                    if (bookieInfoUpdated.isEmpty()) {
+                        return CompletableFuture.completedFuture(bookieIds);
+                    } else {
+                        return FutureUtil
+                                .waitForAll(bookieInfoUpdated)
+                                .thenApply(___ -> bookieIds);
+                    }
+                })
                 .thenApply(s -> new Versioned<>(s, Version.NEW));
     }
 
@@ -129,10 +162,20 @@ public class PulsarRegistrationClient implements RegistrationClient {
 
     private void updatedBookies(Notification n) {
         if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
+
+            if (n.getType() == NotificationType.Deleted) {
+                BookieId bookieId = stripBookieIdFromPath(n.getPath());
+                log.info("Bookie {} disappeared", bookieId);
+                if (bookieId != null) {
+                    bookieServiceInfoCache.remove(bookieId);
+                }
+            }
+
             if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
-                getReadOnlyBookies().thenAccept(bookies ->
-                        readOnlyBookiesWatchers.keySet()
-                                .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
+                getReadOnlyBookies().thenAccept(bookies -> {
+                    readOnlyBookiesWatchers.keySet()
+                            .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
+                });
             } else if (n.getPath().startsWith(bookieRegistrationPath)) {
                 getWritableBookies().thenAccept(bookies ->
                         writableBookiesWatchers.keySet()
@@ -141,6 +184,22 @@ public class PulsarRegistrationClient implements RegistrationClient {
         }
     }
 
+    private static BookieId stripBookieIdFromPath(String path) {
+        if (path == null) {
+            return null;
+        }
+        final int slash = path.lastIndexOf('/');
+        if (slash >= 0) {
+            try {
+                return BookieId.parse(path.substring(slash + 1));
+            } catch (IllegalArgumentException e) {
+                log.warn("Cannot decode bookieId from {}", path, e);
+            }
+        }
+        return null;
+    }
+
+
     private static Set<BookieId> convertToBookieAddresses(List<String> children) {
         // Read the bookie addresses into a set for efficient lookup
         HashSet<BookieId> newBookieAddrs = new HashSet<>();
@@ -153,4 +212,56 @@ public class PulsarRegistrationClient implements RegistrationClient {
         }
         return newBookieAddrs;
     }
+
+    @Override
+    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
+        // this method cannot perform blocking calls to the MetadataStore
+        // or return a CompletableFuture that is completed on the MetadataStore main thread
+        // this is because there are a few cases in which some operations on the main thread
+        // wait for the result. This is due to the fact that resolving the address of a bookie
+        // is needed in many code paths.
+        Versioned<BookieServiceInfo> resultFromCache = bookieServiceInfoCache.get(bookieId);
+        if (log.isDebugEnabled()) {
+            log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache);
+        }
+        if (resultFromCache != null) {
+            return CompletableFuture.completedFuture(resultFromCache);
+        } else {
+            return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
+        }
+    }
+
+    public CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
+        String asWritable = bookieRegistrationPath + "/" + bookieId;
+        return bookieServiceInfoMetadataCache.get(asWritable)
+                .thenCompose((Optional<BookieServiceInfo> getResult) -> {
+                    if (getResult.isPresent()) {
+                        Versioned<BookieServiceInfo> res =
+                                new Versioned<>(getResult.get(), new LongVersion(-1));
+                        log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get());
+                        bookieServiceInfoCache.put(bookieId, res);
+                        return CompletableFuture.completedFuture(res);
+                    } else {
+                        return readBookieInfoAsReadonlyBookie(bookieId);
+                    }
+                }
+        );
+    }
+
+    final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
+        String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
+        return bookieServiceInfoMetadataCache.get(asReadonly)
+                .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
+                    if (getResultAsReadOnly.isPresent()) {
+                        Versioned<BookieServiceInfo> res =
+                                new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
+                        log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId,
+                                getResultAsReadOnly.get());
+                        bookieServiceInfoCache.put(bookieId, res);
+                        return res;
+                    } else {
+                        throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
+                    }
+                });
+    }
 }
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 496cfebea51..35db2684617 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -23,6 +23,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -114,6 +116,66 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
         assertEquals(result.getValue().size(), addresses.size());
     }
 
+    @Test(dataProvider = "impl")
+    public void testGetBookieServiceInfo(String provider, Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store =
+                MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
+
+        @Cleanup
+        RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class));
+
+        @Cleanup
+        RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);
+
+        List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
+        List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
+        int port = 223;
+        for (BookieId address : addresses) {
+            BookieServiceInfo info = new BookieServiceInfo();
+            BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+            endpoint.setAuth(Collections.emptyList());
+            endpoint.setExtensions(Collections.emptyList());
+            endpoint.setId("id");
+            endpoint.setHost("localhost");
+            endpoint.setPort(port++);
+            endpoint.setProtocol("bookie-rpc");
+            info.setEndpoints(Arrays.asList(endpoint));
+            bookieServiceInfos.add(info);
+            // some readonly, some writable
+            boolean readOnly = port % 2 == 0;
+            rm.registerBookie(address, readOnly, info);
+        }
+
+        // trigger loading the BookieServiceInfo in the local cache
+        rc.getAllBookies().join();
+
+        int i = 0;
+        for (BookieId address : addresses) {
+            BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
+            compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
+        }
+
+    }
+
+    private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
+        assertEquals(a.getProperties(), b.getProperties());
+        assertEquals(a.getEndpoints().size(), b.getEndpoints().size());
+        for (int i = 0; i < a.getEndpoints().size(); i++) {
+            BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i);
+            BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i);
+            assertEquals(e1.getHost(), e2.getHost());
+            assertEquals(e1.getPort(), e2.getPort());
+            assertEquals(e1.getId(), e2.getId());
+            assertEquals(e1.getProtocol(), e2.getProtocol());
+            assertEquals(e1.getExtensions(), e2.getExtensions());
+            assertEquals(e1.getAuth(), e2.getAuth());
+        }
+
+    }
+
     @Test(dataProvider = "impl")
     public void testGetAllBookies(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup