You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/20 20:20:03 UTC
[pulsar] branch master updated: [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0c326c0845e [fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133)
0c326c0845e is described below
commit 0c326c0845eb66f513a5f199308620074cdb9b04
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Oct 20 22:19:56 2022 +0200
[fix][broker] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates (#18133)
* [fix] PulsarRegistrationClient - implement getAllBookies and follow BookieServiceInfo updates
* remove debug
---
.../bookkeeper/PulsarRegistrationClient.java | 61 ++++++-----
.../bookkeeper/PulsarRegistrationManager.java | 21 +++-
.../bookkeeper/PulsarRegistrationClientTest.java | 118 +++++++++++++++++++--
3 files changed, 163 insertions(+), 37 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index f314c0efaf0..da7b1ae5fdb 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -97,14 +96,9 @@ public class PulsarRegistrationClient implements RegistrationClient {
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
- CompletableFuture<Versioned<Set<BookieId>>> wb = getWritableBookies();
- CompletableFuture<Versioned<Set<BookieId>>> rb = getReadOnlyBookies();
- return wb.thenCombine(rb, (rw, ro) -> {
- Set<BookieId> res = new HashSet<>();
- res.addAll(rw.getValue());
- res.addAll(ro.getValue());
- return new Versioned<>(res, Version.NEW);
- });
+ // this method is meant to return all the known bookies, even the bookies
+ // that are not in a running state
+ return getChildren(bookieAllRegistrationPath);
}
@Override
@@ -116,10 +110,9 @@ public class PulsarRegistrationClient implements RegistrationClient {
return store.getChildren(path)
.thenComposeAsync(children -> {
Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
- Set<BookieId> bookies = convertToBookieAddresses(children);
- List<CompletableFuture<Versioned<BookieServiceInfo>>> bookieInfoUpdated =
- new ArrayList<>(bookies.size());
- for (BookieId id : bookies) {
+ List<CompletableFuture<?>> bookieInfoUpdated =
+ new ArrayList<>(bookieIds.size());
+ for (BookieId id : bookieIds) {
// update the cache for new bookies
if (!bookieServiceInfoCache.containsKey(id)) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
@@ -160,26 +153,42 @@ public class PulsarRegistrationClient implements RegistrationClient {
readOnlyBookiesWatchers.remove(registrationListener);
}
- private void updatedBookies(Notification n) {
- if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
-
- if (n.getType() == NotificationType.Deleted) {
- BookieId bookieId = stripBookieIdFromPath(n.getPath());
+ private void handleDeletedBookieNode(Notification n) {
+ if (n.getType() == NotificationType.Deleted) {
+ BookieId bookieId = stripBookieIdFromPath(n.getPath());
+ if (bookieId != null) {
log.info("Bookie {} disappeared", bookieId);
- if (bookieId != null) {
- bookieServiceInfoCache.remove(bookieId);
- }
+ bookieServiceInfoCache.remove(bookieId);
}
+ }
+ }
+ private void handleUpdatedBookieNode(Notification n) {
+ BookieId bookieId = stripBookieIdFromPath(n.getPath());
+ if (bookieId != null) {
+ log.info("Bookie {} info updated", bookieId);
+ readBookieServiceInfoAsync(bookieId);
+ }
+ }
+
+ private void updatedBookies(Notification n) {
+ if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies -> {
readOnlyBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
});
+ handleDeletedBookieNode(n);
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
+ handleDeletedBookieNode(n);
+ }
+ } else if (n.getType() == NotificationType.Modified) {
+ if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
+ || n.getPath().startsWith(bookieRegistrationPath)) {
+ handleUpdatedBookieNode(n);
}
}
}
@@ -231,7 +240,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
}
}
- public CompletableFuture<Versioned<BookieServiceInfo>> readBookieServiceInfoAsync(BookieId bookieId) {
+ public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId bookieId) {
String asWritable = bookieRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asWritable)
.thenCompose((Optional<BookieServiceInfo> getResult) -> {
@@ -240,7 +249,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
new Versioned<>(getResult.get(), new LongVersion(-1));
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get());
bookieServiceInfoCache.put(bookieId, res);
- return CompletableFuture.completedFuture(res);
+ return CompletableFuture.completedFuture(null);
} else {
return readBookieInfoAsReadonlyBookie(bookieId);
}
@@ -248,7 +257,7 @@ public class PulsarRegistrationClient implements RegistrationClient {
);
}
- final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
+ final CompletableFuture<Void> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asReadonly)
.thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
@@ -258,10 +267,8 @@ public class PulsarRegistrationClient implements RegistrationClient {
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId,
getResultAsReadOnly.get());
bookieServiceInfoCache.put(bookieId, res);
- return res;
- } else {
- throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
}
+ return null;
});
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
index 36840b0134e..ca53339591f 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManager.java
@@ -88,11 +88,25 @@ public class PulsarRegistrationManager implements RegistrationManager {
@SneakyThrows
public void close() {
for (ResourceLock<BookieServiceInfo> rwBookie : bookieRegistration.values()) {
- rwBookie.release().get();
+ try {
+ rwBookie.release().get();
+ } catch (ExecutionException ignore) {
+ log.error("Cannot release correctly {}", rwBookie, ignore.getCause());
+ } catch (InterruptedException ignore) {
+ log.error("Cannot release correctly {}", rwBookie, ignore);
+ Thread.currentThread().interrupt();
+ }
}
for (ResourceLock<BookieServiceInfo> roBookie : bookieRegistrationReadOnly.values()) {
- roBookie.release().get();
+ try {
+ roBookie.release().get();
+ } catch (ExecutionException ignore) {
+ log.error("Cannot release correctly {}", roBookie, ignore.getCause());
+ } catch (InterruptedException ignore) {
+ log.error("Cannot release correctly {}", roBookie, ignore);
+ Thread.currentThread().interrupt();
+ }
}
coordinationService.close();
}
@@ -115,11 +129,13 @@ public class PulsarRegistrationManager implements RegistrationManager {
throws BookieException {
String regPath = bookieRegistrationPath + "/" + bookieId;
String regPathReadOnly = bookieReadonlyRegistrationPath + "/" + bookieId;
+ log.info("RegisterBookie {} readOnly {} info {}", bookieId, readOnly, bookieServiceInfo);
try {
if (readOnly) {
ResourceLock<BookieServiceInfo> rwRegistration = bookieRegistration.remove(bookieId);
if (rwRegistration != null) {
+ log.info("Bookie {} was already registered as writable, unregistering");
rwRegistration.release().get();
}
@@ -128,6 +144,7 @@ public class PulsarRegistrationManager implements RegistrationManager {
} else {
ResourceLock<BookieServiceInfo> roRegistration = bookieRegistrationReadOnly.remove(bookieId);
if (roRegistration != null) {
+ log.info("Bookie {} was already registered as read-only, unregistering");
roRegistration.release().get();
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 35db2684617..409a6724381 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -22,9 +22,13 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -33,15 +37,18 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -130,8 +137,12 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
@Cleanup
RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);
- List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
- List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
+ List<BookieId> addresses = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ addresses.add(BookieId.parse("BOOKIE-" + i));
+ }
+ Map<BookieId, BookieServiceInfo> bookieServiceInfos = new HashMap<>();
+ Set<BookieId> readOnlyBookies = new HashSet<>();
int port = 223;
for (BookieId address : addresses) {
BookieServiceInfo info = new BookieServiceInfo();
@@ -143,21 +154,113 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
endpoint.setPort(port++);
endpoint.setProtocol("bookie-rpc");
info.setEndpoints(Arrays.asList(endpoint));
- bookieServiceInfos.add(info);
+ bookieServiceInfos.put(address, info);
// some readonly, some writable
boolean readOnly = port % 2 == 0;
+ if (readOnly) {
+ readOnlyBookies.add(address);
+ }
rm.registerBookie(address, readOnly, info);
+ // write the cookie
+ rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW));
}
// trigger loading the BookieServiceInfo in the local cache
- rc.getAllBookies().join();
+ getAndVerifyAllBookies(rc, addresses);
- int i = 0;
+ Awaitility.await().untilAsserted(() -> {
for (BookieId address : addresses) {
BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
- compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
+ compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address));
+ }});
+
+ // shutdown the bookies (but keep the cookie)
+ for (BookieId address : addresses) {
+ rm.unregisterBookie(address, readOnlyBookies.contains(address));
+ readOnlyBookies.remove(address);
+ }
+
+ // getAllBookies should find all the bookies in any case (it reads the cookies)
+ getAndVerifyAllBookies(rc, addresses);
+
+ // getBookieServiceInfo should fail with BKBookieHandleNotAvailableException
+ Awaitility.await().untilAsserted(() -> {
+ for (BookieId address : addresses) {
+ assertTrue(
+ expectThrows(ExecutionException.class, () -> {
+ rc.getBookieServiceInfo(address).get();
+ }).getCause() instanceof BKException.BKBookieHandleNotAvailableException);
+ }});
+
+
+ // restart the bookies, all writable
+ // we 'register' the bookie, but do not write the cookie again
+ for (BookieId address : addresses) {
+ rm.registerBookie(address, false, bookieServiceInfos.get(address));
}
+ getAndVerifyAllBookies(rc, addresses);
+
+ // verify that infos are available again
+ Awaitility.await()
+ .ignoreExceptionsMatching(e -> e.getCause() instanceof BKException.BKBookieHandleNotAvailableException)
+ .untilAsserted(() -> {
+ for (BookieId address : addresses) {
+ BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
+ compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address));
+ }
+ });
+
+ // update the infos
+ port = 111;
+ for (BookieId address : addresses) {
+ BookieServiceInfo info = new BookieServiceInfo();
+ BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+ endpoint.setAuth(Collections.emptyList());
+ endpoint.setExtensions(Collections.emptyList());
+ endpoint.setId("id");
+ endpoint.setHost("localhost");
+ endpoint.setPort(port++);
+ endpoint.setProtocol("bookie-rpc");
+ info.setEndpoints(Arrays.asList(endpoint));
+ bookieServiceInfos.put(address, info);
+ // some readonly, some writable
+ boolean readOnly = port % 2 == 0;
+
+ // remove the previous info from the metadata service
+ rm.unregisterBookie(address, readOnlyBookies.contains(address));
+
+ rm.registerBookie(address, readOnly, info);
+
+ if (readOnly) {
+ readOnlyBookies.add(address);
+ }
+ }
+
+ // verify that the client tracked the changes
+ Awaitility
+ .await()
+ .ignoreExceptionsMatching(e -> e.getCause() instanceof BKException.BKBookieHandleNotAvailableException)
+ .untilAsserted(() -> {
+ // verify that infos are updated
+ for (BookieId address : addresses) {
+ BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
+ compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address));
+ }
+ });
+
+ }
+
+ private static void getAndVerifyAllBookies(RegistrationClient rc, List<BookieId> addresses)
+ throws InterruptedException, ExecutionException {
+ Set<BookieId> all = rc.getAllBookies().get().getValue();
+ assertEquals(all.size(), addresses.size());
+ for (BookieId id : all) {
+ assertTrue(addresses.contains(id));
+ }
+ for (BookieId id : addresses) {
+ assertTrue(all.contains(id));
+ }
}
private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
@@ -194,8 +297,7 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
List<String> children = new ArrayList<>();
for (BookieId address : addresses) {
children.add(address.toString());
- boolean isReadOnly = children.size() % 2 == 0;
- rm.registerBookie(address, isReadOnly, new BookieServiceInfo());
+ rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW));
}
Versioned<Set<BookieId>> result = result(rc.getAllBookies());