You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/10/04 06:22:53 UTC

[pulsar] branch branch-2.10 updated: Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 850c9448a5a Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914)
850c9448a5a is described below

commit 850c9448a5ac32e2f94988b8bf80955c93ef9d6c
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Oct 4 08:22:03 2022 +0200

    Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914)
    
    This reverts commit 8d7ac33751c62383b510a04ec223981bd70cd4db.
    
    (cherry picked from commit 9d6c34ea5d77bb96ecc21b1ec3a18fa4b730e7bd)
---
 .../bookkeeper/BookieServiceInfoSerde.java         | 55 +-------------------
 .../bookkeeper/PulsarRegistrationClient.java       | 36 -------------
 .../bookkeeper/PulsarRegistrationClientTest.java   | 59 ----------------------
 3 files changed, 2 insertions(+), 148 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
index b7e3024b637..78a33179e76 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.discover.BookieServiceInfo;
-import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
 import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.Stat;
@@ -64,57 +63,7 @@ public class BookieServiceInfoSerde implements MetadataSerde<BookieServiceInfo>
     }
 
     @Override
-    public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException {
-        // see https://github.com/apache/bookkeeper/blob/
-        // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
-        // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311
-        String bookieId = extractBookiedIdFromPath(path);
-        if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
-            return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId);
-        }
-
-        BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo);
-        BookieServiceInfo bsi = new BookieServiceInfo();
-        List<BookieServiceInfo.Endpoint> endpoints = builder.getEndpointsList().stream()
-                .map(e -> {
-                    BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
-                    endpoint.setId(e.getId());
-                    endpoint.setPort(e.getPort());
-                    endpoint.setHost(e.getHost());
-                    endpoint.setProtocol(e.getProtocol());
-                    endpoint.setAuth(e.getAuthList());
-                    endpoint.setExtensions(e.getExtensionsList());
-                    return endpoint;
-                })
-                .collect(Collectors.toList());
-
-        bsi.setEndpoints(endpoints);
-        bsi.setProperties(builder.getPropertiesMap());
-
-        return bsi;
-
-    }
-
-    /**
-     * Extract the BookieId
-     * The path should look like /ledgers/available/bookieId
-     * or /ledgers/available/readonly/bookieId.
-     * But the prefix depends on the configuration.
-     * @param path
-     * @return the bookieId
-     */
-    private static String extractBookiedIdFromPath(String path) throws IOException {
-        // https://github.com/apache/bookkeeper/blob/
-        // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
-        // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258
-        if (path == null) {
-            path = "";
-        }
-        int last = path.lastIndexOf("/");
-        if (last >= 0) {
-            return path.substring(last + 1);
-        } else {
-            throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node");
-        }
+    public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException {
+        return null;
     }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 1c692404318..52b50e3ea4b 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -25,21 +25,15 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
@@ -55,14 +49,12 @@ public class PulsarRegistrationClient implements RegistrationClient {
 
     private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
     private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
-    private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
     private final ScheduledExecutorService executor;
 
     public PulsarRegistrationClient(MetadataStore store,
                                     String ledgersRootPath) {
         this.store = store;
         this.ledgersRootPath = ledgersRootPath;
-        this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
 
         // Following Bookie Network Address Changes is an expensive operation
         // as it requires additional ZooKeeper watches
@@ -161,32 +153,4 @@ public class PulsarRegistrationClient implements RegistrationClient {
         }
         return newBookieAddrs;
     }
-
-    @Override
-    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
-        String asWritable = bookieRegistrationPath + "/" + bookieId;
-
-        return bookieServiceInfoMetadataCache.get(asWritable)
-                .thenCompose((Optional<BookieServiceInfo> getResult) -> {
-                    if (getResult.isPresent()) {
-                        return CompletableFuture.completedFuture(new Versioned<>(getResult.get(),
-                                    new LongVersion(-1)));
-                    } else {
-                        return readBookieInfoAsReadonlyBookie(bookieId);
-                    }
-                }
-        );
-    }
-
-    final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
-        String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
-        return bookieServiceInfoMetadataCache.get(asReadonly)
-                .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
-                    if (getResultAsReadOnly.isPresent()) {
-                        return new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
-                    } else {
-                        throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
-                    }
-                });
-    }
 }
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 914e70e4068..38195b230ce 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -116,63 +114,6 @@ public class PulsarRegistrationClientTest extends BaseMetadataStoreTest {
         assertEquals(addresses.size(), result.getValue().size());
     }
 
-    @Test(dataProvider = "impl")
-    public void testGetBookieServiceInfo(String provider, Supplier<String> urlSupplier) throws Exception {
-        @Cleanup
-        MetadataStoreExtended store =
-                MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
-
-        String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
-
-        @Cleanup
-        RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class));
-
-        @Cleanup
-        RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);
-
-        List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
-        List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
-        int port = 223;
-        for (BookieId address : addresses) {
-            BookieServiceInfo info = new BookieServiceInfo();
-            BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
-            endpoint.setAuth(Collections.emptyList());
-            endpoint.setExtensions(Collections.emptyList());
-            endpoint.setId("id");
-            endpoint.setHost("localhost");
-            endpoint.setPort(port++);
-            endpoint.setProtocol("bookie-rpc");
-            info.setEndpoints(Arrays.asList(endpoint));
-            bookieServiceInfos.add(info);
-            // some readonly, some writable
-            boolean readOnly = port % 2 == 0;
-            rm.registerBookie(address, readOnly, info);
-        }
-
-        int i = 0;
-        for (BookieId address : addresses) {
-            BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
-            compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
-        }
-
-    }
-
-    private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
-        assertEquals(a.getProperties(), b.getProperties());
-        assertEquals(a.getEndpoints().size(), b.getEndpoints().size());
-        for (int i = 0; i < a.getEndpoints().size(); i++) {
-            BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i);
-            BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i);
-            assertEquals(e1.getHost(), e2.getHost());
-            assertEquals(e1.getPort(), e2.getPort());
-            assertEquals(e1.getId(), e2.getId());
-            assertEquals(e1.getProtocol(), e2.getProtocol());
-            assertEquals(e1.getExtensions(), e2.getExtensions());
-            assertEquals(e1.getAuth(), e2.getAuth());
-        }
-
-    }
-
     @Test(dataProvider = "impl")
     public void testGetAllBookies(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup