You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/25 04:00:00 UTC
[pulsar] branch branch-2.10 updated: Make BookieId work with PulsarRegistrationDriver (#17762)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 6c281dae2f8 Make BookieId work with PulsarRegistrationDriver (#17762)
6c281dae2f8 is described below
commit 6c281dae2f8482cb7abca571bada4be92d9d6a91
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sat Sep 24 22:07:49 2022 +0200
Make BookieId work with PulsarRegistrationDriver (#17762)
* Make BookieId work with PulsarRegistrationDriver
* Switch to MetadataCache
* checkstyle
(cherry picked from commit 8d7ac33751c62383b510a04ec223981bd70cd4db)
---
.../bookkeeper/BookieServiceInfoSerde.java | 55 +++++++++++++++++++-
.../bookkeeper/PulsarRegistrationClient.java | 36 +++++++++++++
.../bookkeeper/PulsarRegistrationClientTest.java | 59 ++++++++++++++++++++++
3 files changed, 148 insertions(+), 2 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
index 78a33179e76..b7e3024b637 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.discover.BookieServiceInfo;
+import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.Stat;
@@ -63,7 +64,57 @@ public class BookieServiceInfoSerde implements MetadataSerde<BookieServiceInfo>
}
@Override
- public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException {
- return null;
+ public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException {
+ // see https://github.com/apache/bookkeeper/blob/
+ // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
+ // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311
+ String bookieId = extractBookiedIdFromPath(path);
+ if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
+ return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId);
+ }
+
+ BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo);
+ BookieServiceInfo bsi = new BookieServiceInfo();
+ List<BookieServiceInfo.Endpoint> endpoints = builder.getEndpointsList().stream()
+ .map(e -> {
+ BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+ endpoint.setId(e.getId());
+ endpoint.setPort(e.getPort());
+ endpoint.setHost(e.getHost());
+ endpoint.setProtocol(e.getProtocol());
+ endpoint.setAuth(e.getAuthList());
+ endpoint.setExtensions(e.getExtensionsList());
+ return endpoint;
+ })
+ .collect(Collectors.toList());
+
+ bsi.setEndpoints(endpoints);
+ bsi.setProperties(builder.getPropertiesMap());
+
+ return bsi;
+
+ }
+
+ /**
+ * Extract the BookieId
+ * The path should look like /ledgers/available/bookieId
+ * or /ledgers/available/readonly/bookieId.
+ * But the prefix depends on the configuration.
+ * @param path
+ * @return the bookieId
+ */
+ private static String extractBookiedIdFromPath(String path) throws IOException {
+ // https://github.com/apache/bookkeeper/blob/
+ // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
+ // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258
+ if (path == null) {
+ path = "";
+ }
+ int last = path.lastIndexOf("/");
+ if (last >= 0) {
+ return path.substring(last + 1);
+ } else {
+ throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node");
+ }
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 38e2a33ef3f..d27a3df1a2f 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -24,13 +24,19 @@ import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
@@ -46,11 +52,13 @@ public class PulsarRegistrationClient implements RegistrationClient {
private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
+ private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.ledgersRootPath = ledgersRootPath;
+ this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
@@ -145,4 +153,32 @@ public class PulsarRegistrationClient implements RegistrationClient {
}
return newBookieAddrs;
}
+
+ @Override
+ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
+ String asWritable = bookieRegistrationPath + "/" + bookieId;
+
+ return bookieServiceInfoMetadataCache.get(asWritable)
+ .thenCompose((Optional<BookieServiceInfo> getResult) -> {
+ if (getResult.isPresent()) {
+ return CompletableFuture.completedFuture(new Versioned<>(getResult.get(),
+ new LongVersion(-1)));
+ } else {
+ return readBookieInfoAsReadonlyBookie(bookieId);
+ }
+ }
+ );
+ }
+
+ final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
+ String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
+ return bookieServiceInfoMetadataCache.get(asReadonly)
+ .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
+ if (getResultAsReadOnly.isPresent()) {
+ return new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
+ } else {
+ throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
+ }
+ });
+ }
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 38195b230ce..914e70e4068 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -114,6 +116,63 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
assertEquals(addresses.size(), result.getValue().size());
}
+ @Test(dataProvider = "impl")
+ public void testGetBookieServiceInfo(String provider, Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
+ MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
+
+ @Cleanup
+ RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class));
+
+ @Cleanup
+ RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);
+
+ List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
+ List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
+ int port = 223;
+ for (BookieId address : addresses) {
+ BookieServiceInfo info = new BookieServiceInfo();
+ BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
+ endpoint.setAuth(Collections.emptyList());
+ endpoint.setExtensions(Collections.emptyList());
+ endpoint.setId("id");
+ endpoint.setHost("localhost");
+ endpoint.setPort(port++);
+ endpoint.setProtocol("bookie-rpc");
+ info.setEndpoints(Arrays.asList(endpoint));
+ bookieServiceInfos.add(info);
+ // some readonly, some writable
+ boolean readOnly = port % 2 == 0;
+ rm.registerBookie(address, readOnly, info);
+ }
+
+ int i = 0;
+ for (BookieId address : addresses) {
+ BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
+ compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
+ }
+
+ }
+
+ private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
+ assertEquals(a.getProperties(), b.getProperties());
+ assertEquals(a.getEndpoints().size(), b.getEndpoints().size());
+ for (int i = 0; i < a.getEndpoints().size(); i++) {
+ BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i);
+ BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i);
+ assertEquals(e1.getHost(), e2.getHost());
+ assertEquals(e1.getPort(), e2.getPort());
+ assertEquals(e1.getId(), e2.getId());
+ assertEquals(e1.getProtocol(), e2.getProtocol());
+ assertEquals(e1.getExtensions(), e2.getExtensions());
+ assertEquals(e1.getAuth(), e2.getAuth());
+ }
+
+ }
+
@Test(dataProvider = "impl")
public void testGetAllBookies(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup