You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/20 20:20:03 UTC

[pulsar] branch master updated: [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c326c0845e [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133)
0c326c0845e is described below

commit 0c326c0845eb66f513a5f199308620074cdb9b04
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Oct 20 22:19:56 2022 +0200

    [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133)
    
    * [fix] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates
    
    * remove debug
---
 .../bookkeeper/PulsarRegistrationClient.java       |  61 ++++++-----
 .../bookkeeper/PulsarRegistrationManager.java      |  21 +++-
 .../bookkeeper/PulsarRegistrationClientTest.java   | 118 +++++++++++++++++++--
 3 files changed, 163 insertions(+), 37 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index f314c0efaf0..da7b1ae5fdb 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -97,14 +96,9 @@ public class PulsarRegistrationClient implements RegistrationClient {
 
     @Override
     public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
-        CompletableFuture<Versioned<Set<BookieId>>> wb = getWritableBookies();
-        CompletableFuture<Versioned<Set<BookieId>>> rb = getReadOnlyBookies();
-        return wb.thenCombine(rb, (rw, ro) -> {
-            Set<BookieId> res = new HashSet<>();
-            res.addAll(rw.getValue());
-            res.addAll(ro.getValue());
-            return new Versioned<>(res, Version.NEW);
-        });
+        // this method is meant to return all the known bookies, even the bookies
+        // that are not in a running state
+        return getChildren(bookieAllRegistrationPath);
     }
 
     @Override
@@ -116,10 +110,9 @@ public class PulsarRegistrationClient implements RegistrationClient {
         return store.getChildren(path)
                 .thenComposeAsync(children -> {
                     Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
-                    Set<BookieId> bookies = convertToBookieAddresses(children);
-                    List<CompletableFuture<Versioned<BookieServiceInfo>>> bookieInfoUpdated =
-                            new ArrayList<>(bookies.size());
-                    for (BookieId id : bookies) {
+                    List<CompletableFuture<?>> bookieInfoUpdated =
+                            new ArrayList<>(bookieIds.size());
+                    for (BookieId id : bookieIds) {
                         // update the cache for new bookies
                         if (!bookieServiceInfoCache.containsKey(id)) {
                             bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
@@ -160,26 +153,42 @@ public class PulsarRegistrationClient implements RegistrationClient {
         readOnlyBookiesWatchers.remove(registrationListener);
     }
 
-    private void updatedBookies(Notification n) {
-        if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
-
-            if (n.getType() == NotificationType.Deleted) {
-                BookieId bookieId = stripBookieIdFromPath(n.getPath());
+    private void handleDeletedBookieNode(Notification n) {
+        if (n.getType() == NotificationType.Deleted) {
+            BookieId bookieId = stripBookieIdFromPath(n.getPath());
+            if (bookieId != null) {
                 log.info("Bookie {} disappeared", bookieId);
-                if (bookieId != null) {
-                    bookieServiceInfoCache.remove(bookieId);
-                }
+                bookieServiceInfoCache.remove(bookieId);
             }
+        }
+    }
 
+    private void handleUpdatedBookieNode(Notification n) {
+        BookieId bookieId = stripBookieIdFromPath(n.getPath());
+        if (bookieId != null) {
+            log.info("Bookie {} info updated", bookieId);
+            readBookieServiceInfoAsync(bookieId);
+        }
+    }
+
+    private void updatedBookies(Notification n) {
+        if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
             if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
                 getReadOnlyBookies().thenAccept(bookies -> {
                     readOnlyBookiesWatchers.keySet()
                             .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
                 });
+                handleDeletedBookieNode(n);
             } else if (n.getPath().startsWith(bookieRegistrationPath)) {
                 getWritableBookies().thenAccept(bookies ->
                         writableBookiesWatchers.keySet()
                                 .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
+                handleDeletedBookieNode(n);
+            }
+        } else if (n.getType() == NotificationType.Modified) {
+            if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
+                || n.getPath().startsWith(bookieRegistrationPath)) {
+                handleUpdatedBookieNode(n);
             }
         }
     }
@@ -231,7 +240,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
         }
     }
 
-    public CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
+    public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId bookieId) {
         String asWritable = bookieRegistrationPath + "/" + bookieId;
         return bookieServiceInfoMetadataCache.get(asWritable)
                 .thenCompose((Optional<BookieServiceInfo> getResult) -> {
@@ -240,7 +249,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
                                 new Versioned<>(getResult.get(), new LongVersion(-1));
                         log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get());
                         bookieServiceInfoCache.put(bookieId, res);
-                        return CompletableFuture.completedFuture(res);
+                        return CompletableFuture.completedFuture(null);
                     } else {
                         return readBookieInfoAsReadonlyBookie(bookieId);
                     }
@@ -248,7 +257,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
         );
     }
 
-    final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
+    final CompletableFuture<Void> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
         String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
         return bookieServiceInfoMetadataCache.get(asReadonly)
                 .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
@@ -258,10 +267,8 @@ public class PulsarRegistrationClient implements RegistrationClient {
                         log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId,
                                 getResultAsReadOnly.get());
                         bookieServiceInfoCache.put(bookieId, res);
-                        return res;
-                    } else {
-                        throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
                     }
+                    return null;
                 });
     }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
index 36840b0134e..ca53339591f 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
@@ -88,11 +88,25 @@ public class PulsarRegistrationManager implements RegistrationManager {
     @SneakyThrows
     public void close() {
         for (ResourceLock<BookieServiceInfo> rwBookie : bookieRegistration.values()) {
-            rwBookie.release().get();
+            try {
+                rwBookie.release().get();
+            } catch (ExecutionException ignore) {
+                log.error("Cannot release correctly {}", rwBookie, ignore.getCause());
+            } catch (InterruptedException ignore) {
+                log.error("Cannot release correctly {}", rwBookie, ignore);
+                Thread.currentThread().interrupt();
+            }
         }
 
         for (ResourceLock<BookieServiceInfo> roBookie : bookieRegistrationReadOnly.values()) {
-            roBookie.release().get();
+            try {
+                roBookie.release().get();
+            } catch (ExecutionException ignore) {
+                log.error("Cannot release correctly {}", roBookie, ignore.getCause());
+            } catch (InterruptedException ignore) {
+                log.error("Cannot release correctly {}", roBookie, ignore);
+                Thread.currentThread().interrupt();
+            }
         }
         coordinationService.close();
     }
@@ -115,11 +129,13 @@ public class PulsarRegistrationManager implements RegistrationManager {
             throws BookieException {
         String regPath = bookieRegistrationPath + "/" + bookieId;
         String regPathReadOnly = bookieReadonlyRegistrationPath + "/" + bookieId;
+        log.info("RegisterBookie {} readOnly {} info {}", bookieId, readOnly, bookieServiceInfo);
 
         try {
             if (readOnly) {
                 ResourceLock<BookieServiceInfo> rwRegistration = bookieRegistration.remove(bookieId);
                 if (rwRegistration != null) {
+                    log.info("Bookie {} was already registered as writable, unregistering");
                     rwRegistration.release().get();
                 }
 
@@ -128,6 +144,7 @@ public class PulsarRegistrationManager implements RegistrationManager {
             } else {
                 ResourceLock<BookieServiceInfo> roRegistration = bookieRegistrationReadOnly.remove(bookieId);
                 if (roRegistration != null) {
+                    log.info("Bookie {} was already registered as read-only, unregistering");
                     roRegistration.release().get();
                 }
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 35db2684617..409a6724381 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -22,9 +22,13 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -33,15 +37,18 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.pulsar.metadata.BaseMetadataStoreTest;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -130,8 +137,12 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
         @Cleanup
         RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);
 
-        List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
-        List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
+        List<BookieId> addresses = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            addresses.add(BookieId.parse("BOOKIE-" + i));
+        }
+        Map<BookieId, BookieServiceInfo> bookieServiceInfos = new HashMap<>();
+        Set<BookieId> readOnlyBookies = new HashSet<>();
         int port = 223;
         for (BookieId address : addresses) {
             BookieServiceInfo info = new BookieServiceInfo();
@@ -143,21 +154,113 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
             endpoint.setPort(port++);
             endpoint.setProtocol("bookie-rpc");
             info.setEndpoints(Arrays.asList(endpoint));
-            bookieServiceInfos.add(info);
+            bookieServiceInfos.put(address, info);
             // some readonly, some writable
             boolean readOnly = port % 2 == 0;
+            if (readOnly) {
+                readOnlyBookies.add(address);
+            }
             rm.registerBookie(address, readOnly, info);
+            // write the cookie
+            rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW));
         }
 
         // trigger loading the BookieServiceInfo in the local cache
-        rc.getAllBookies().join();
+        getAndVerifyAllBookies(rc, addresses);
 
-        int i = 0;
+        Awaitility.await().untilAsserted(() -> {
         for (BookieId address : addresses) {
             BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
-            compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
+            compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address));
+        }});
+
+        // shutdown the bookies (but keep the cookie)
+        for (BookieId address : addresses) {
+            rm.unregisterBookie(address, readOnlyBookies.contains(address));
+            readOnlyBookies.remove(address);
+        }
+
+        // getAllBookies should find all the bookies in any case (it reads the cookies)
+        getAndVerifyAllBookies(rc, addresses);
+
+        // getBookieServiceInfo should fail with BKBookieHandleNotAvailableException
+        Awaitility.await().untilAsserted(() -> {
+        for (BookieId address : addresses) {
+            assertTrue(
+                expectThrows(ExecutionException.class, () -> {
+                    rc.getBookieServiceInfo(address).get();
+            }).getCause() instanceof BKException.BKBookieHandleNotAvailableException);
+        }});
+
+
+        // restart the bookies, all writable
+        // we 'register' the bookie, but do not write the cookie again
+        for (BookieId address : addresses) {
+            rm.registerBookie(address, false, bookieServiceInfos.get(address));
         }
 
+        getAndVerifyAllBookies(rc, addresses);
+
+        // verify that infos are available again
+        Awaitility.await()
+                .ignoreExceptionsMatching(e -> e.getCause() instanceof BKException.BKBookieHandleNotAvailableException)
+                .untilAsserted(() -> {
+                    for (BookieId address : addresses) {
+                        BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
+                        compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address));
+                    }
+                });
+
+        // update the infos
+        port = 111;
+        for (BookieId address : addresses) {
+            BookieServiceInfo info = new BookieServiceInfo();
+            BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+            endpoint.setAuth(Collections.emptyList());
+            endpoint.setExtensions(Collections.emptyList());
+            endpoint.setId("id");
+            endpoint.setHost("localhost");
+            endpoint.setPort(port++);
+            endpoint.setProtocol("bookie-rpc");
+            info.setEndpoints(Arrays.asList(endpoint));
+            bookieServiceInfos.put(address, info);
+            // some readonly, some writable
+            boolean readOnly = port % 2 == 0;
+
+            // remove the previous info from the metadata service
+            rm.unregisterBookie(address, readOnlyBookies.contains(address));
+
+            rm.registerBookie(address, readOnly, info);
+
+            if (readOnly) {
+                readOnlyBookies.add(address);
+            }
+        }
+
+        // verify that the client tracked the changes
+        Awaitility
+                .await()
+                .ignoreExceptionsMatching(e -> e.getCause() instanceof BKException.BKBookieHandleNotAvailableException)
+                .untilAsserted(() -> {
+            // verify that infos are updated
+            for (BookieId address : addresses) {
+                BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
+                compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address));
+            }
+        });
+
+    }
+
+    private static void getAndVerifyAllBookies(RegistrationClient rc, List<BookieId> addresses)
+            throws InterruptedException, ExecutionException {
+        Set<BookieId> all = rc.getAllBookies().get().getValue();
+        assertEquals(all.size(), addresses.size());
+        for (BookieId id : all) {
+            assertTrue(addresses.contains(id));
+        }
+        for (BookieId id : addresses) {
+            assertTrue(all.contains(id));
+        }
     }
 
     private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
@@ -194,8 +297,7 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
         List<String> children = new ArrayList<>();
         for (BookieId address : addresses) {
             children.add(address.toString());
-            boolean isReadOnly = children.size() % 2 == 0;
-            rm.registerBookie(address, isReadOnly, new BookieServiceInfo());
+            rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW));
         }
 
         Versioned<Set<BookieId>> result = result(rc.getAllBookies());