You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:28:47 UTC

[pulsar] branch branch-2.11 updated (1c63ee37cfe -> ba890cc9500)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 1c63ee37cfe [doc][proxy] Fix typo in "Document how to mitigate CVE-2022-24280"
     new b121d854c09 [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)
     new ba890cc9500 Make BookieId work with PulsarRegistrationDriver (#17762)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/BookieServiceInfoSerde.java         | 55 +++++++++++++++++++-
 .../bookkeeper/PulsarRegistrationClient.java       | 52 +++++++++++++++++--
 .../bookkeeper/PulsarRegistrationClientTest.java   | 59 ++++++++++++++++++++++
 3 files changed, 160 insertions(+), 6 deletions(-)


[pulsar] 01/02: [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b121d854c0933309475891774b7a90911afb2c81
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Sep 14 11:24:57 2022 +0300

    [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620)
---
 .../metadata/bookkeeper/PulsarRegistrationClient.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 38e2a33ef3f..52b50e3ea4b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -21,12 +21,15 @@ package org.apache.pulsar.metadata.bookkeeper;
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.versioning.Version;
@@ -46,6 +49,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
 
     private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
     private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
+    private final ScheduledExecutorService executor;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
@@ -60,11 +64,15 @@ public class PulsarRegistrationClient implements RegistrationClient {
         this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
 
+        this.executor = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
+
         store.registerListener(this::updatedBookies);
     }
 
     @Override
     public void close() {
+        executor.shutdownNow();
     }
 
     @Override
@@ -99,7 +107,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
     public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
         writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getWritableBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
     }
 
     @Override
@@ -111,7 +119,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
     public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
         readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
         return getReadOnlyBookies()
-                .thenAccept(registrationListener::onBookiesChanged);
+                .thenAcceptAsync(registrationListener::onBookiesChanged, executor);
     }
 
     @Override
@@ -124,11 +132,11 @@ public class PulsarRegistrationClient implements RegistrationClient {
             if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
                 getReadOnlyBookies().thenAccept(bookies ->
                         readOnlyBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
             } else if (n.getPath().startsWith(bookieRegistrationPath)) {
                 getWritableBookies().thenAccept(bookies ->
                         writableBookiesWatchers.keySet()
-                                .forEach(w -> w.onBookiesChanged(bookies)));
+                                .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
             }
         }
     }


[pulsar] 02/02: Make BookieId work with PulsarRegistrationDriver (#17762)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ba890cc950066fccfea6f3f6bf4cb72c33f5d063
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sat Sep 24 22:07:49 2022 +0200

    Make BookieId work with PulsarRegistrationDriver (#17762)
    
    * Make BookieId work with PulsarRegistrationDriver
    
    * Switch to MetadataCache
    
    * checkstyle
---
 .../bookkeeper/BookieServiceInfoSerde.java         | 55 +++++++++++++++++++-
 .../bookkeeper/PulsarRegistrationClient.java       | 36 +++++++++++++
 .../bookkeeper/PulsarRegistrationClientTest.java   | 59 ++++++++++++++++++++++
 3 files changed, 148 insertions(+), 2 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
index 78a33179e76..b7e3024b637 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.discover.BookieServiceInfo;
+import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
 import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.Stat;
@@ -63,7 +64,57 @@ public class BookieServiceInfoSerde implements MetadataSerde<BookieServiceInfo>
     }
 
     @Override
-    public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException {
-        return null;
+    public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException {
+        // see https://github.com/apache/bookkeeper/blob/
+        // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
+        // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311
+        String bookieId = extractBookiedIdFromPath(path);
+        if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
+            return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId);
+        }
+
+        BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo);
+        BookieServiceInfo bsi = new BookieServiceInfo();
+        List<BookieServiceInfo.Endpoint> endpoints = builder.getEndpointsList().stream()
+                .map(e -> {
+                    BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+                    endpoint.setId(e.getId());
+                    endpoint.setPort(e.getPort());
+                    endpoint.setHost(e.getHost());
+                    endpoint.setProtocol(e.getProtocol());
+                    endpoint.setAuth(e.getAuthList());
+                    endpoint.setExtensions(e.getExtensionsList());
+                    return endpoint;
+                })
+                .collect(Collectors.toList());
+
+        bsi.setEndpoints(endpoints);
+        bsi.setProperties(builder.getPropertiesMap());
+
+        return bsi;
+
+    }
+
+    /**
+     * Extract the BookieId
+     * The path should look like /ledgers/available/bookieId
+     * or /ledgers/available/readonly/bookieId.
+     * But the prefix depends on the configuration.
+     * @param path
+     * @return the bookieId
+     */
+    private static String extractBookiedIdFromPath(String path) throws IOException {
+        // https://github.com/apache/bookkeeper/blob/
+        // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
+        // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258
+        if (path == null) {
+            path = "";
+        }
+        int last = path.lastIndexOf("/");
+        if (last >= 0) {
+            return path.substring(last + 1);
+        } else {
+            throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node");
+        }
     }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 52b50e3ea4b..1c692404318 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -25,15 +25,21 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -49,12 +55,14 @@ public class PulsarRegistrationClient implements RegistrationClient {
 
     private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
     private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
+    private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
     private final ScheduledExecutorService executor;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
         this.store = store;
         this.ledgersRootPath = ledgersRootPath;
+        this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
 
         // Following Bookie Network Address Changes is an expensive operation
         // as it requires additional ZooKeeper watches
@@ -153,4 +161,32 @@ public class PulsarRegistrationClient implements RegistrationClient {
         }
         return newBookieAddrs;
     }
+
+    @Override
+    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
+        String asWritable = bookieRegistrationPath + "/" + bookieId;
+
+        return bookieServiceInfoMetadataCache.get(asWritable)
+                .thenCompose((Optional<BookieServiceInfo> getResult) -> {
+                    if (getResult.isPresent()) {
+                        return CompletableFuture.completedFuture(new Versioned<>(getResult.get(),
+                                    new LongVersion(-1)));
+                    } else {
+                        return readBookieInfoAsReadonlyBookie(bookieId);
+                    }
+                }
+        );
+    }
+
+    final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
+        String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
+        return bookieServiceInfoMetadataCache.get(asReadonly)
+                .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
+                    if (getResultAsReadOnly.isPresent()) {
+                        return new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
+                    } else {
+                        throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
+                    }
+                });
+    }
 }
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 496cfebea51..047bedd1587 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -23,6 +23,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -114,6 +116,63 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
         assertEquals(result.getValue().size(), addresses.size());
     }
 
+    @Test(dataProvider = "impl")
+    public void testGetBookieServiceInfo(String provider, Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store =
+                MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
+
+        @Cleanup
+        RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class));
+
+        @Cleanup
+        RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);
+
+        List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
+        List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
+        int port = 223;
+        for (BookieId address : addresses) {
+            BookieServiceInfo info = new BookieServiceInfo();
+            BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+            endpoint.setAuth(Collections.emptyList());
+            endpoint.setExtensions(Collections.emptyList());
+            endpoint.setId("id");
+            endpoint.setHost("localhost");
+            endpoint.setPort(port++);
+            endpoint.setProtocol("bookie-rpc");
+            info.setEndpoints(Arrays.asList(endpoint));
+            bookieServiceInfos.add(info);
+            // some readonly, some writable
+            boolean readOnly = port % 2 == 0;
+            rm.registerBookie(address, readOnly, info);
+        }
+
+        int i = 0;
+        for (BookieId address : addresses) {
+            BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
+            compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
+        }
+
+    }
+
+    private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
+        assertEquals(a.getProperties(), b.getProperties());
+        assertEquals(a.getEndpoints().size(), b.getEndpoints().size());
+        for (int i = 0; i < a.getEndpoints().size(); i++) {
+            BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i);
+            BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i);
+            assertEquals(e1.getHost(), e2.getHost());
+            assertEquals(e1.getPort(), e2.getPort());
+            assertEquals(e1.getId(), e2.getId());
+            assertEquals(e1.getProtocol(), e2.getProtocol());
+            assertEquals(e1.getExtensions(), e2.getExtensions());
+            assertEquals(e1.getAuth(), e2.getAuth());
+        }
+
+    }
+
     @Test(dataProvider = "impl")
     public void testGetAllBookies(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup