You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/12 21:17:20 UTC

[pulsar] branch master updated: Migrated BookkeeperSchemaStorage to use MetadataStore (#10545)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 55af397  Migrated BookkeeperSchemaStorage to use MetadataStore (#10545)
55af397 is described below

commit 55af39759d78db90eaa26008ce21f14063b51130
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 12 14:16:37 2021 -0700

    Migrated BookkeeperSchemaStorage to use MetadataStore (#10545)
    
    * Migrated BookkeeperSchemaStorage to use MetadataStore
    
    * Fixed test
    
    * Fixed checkstyle
    
    * Fixed tests
---
 .../service/schema/BookkeeperSchemaStorage.java    | 117 +++++++--------------
 .../schema/BookkeeperSchemaStorageFactory.java     |   6 +-
 .../schema/BookkeeperSchemaStorageTest.java        |   4 +-
 .../broker/service/schema/SchemaServiceTest.java   |   1 -
 4 files changed, 43 insertions(+), 85 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 3009236..f47d0f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -25,6 +25,8 @@ import static java.util.Objects.isNull;
 import static java.util.Objects.nonNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
+import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -46,7 +48,6 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
@@ -55,14 +56,9 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataSerde;
+import org.apache.pulsar.metadata.api.MetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,12 +66,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     private static final Logger log = LoggerFactory.getLogger(BookkeeperSchemaStorage.class);
 
     private static final String SchemaPath = "/schemas";
-    private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
     private static final byte[] LedgerPassword = "".getBytes();
 
+    private final MetadataStore store;
     private final PulsarService pulsar;
-    private final ZooKeeper zooKeeper;
-    private final ZooKeeperCache localZkCache;
+    private final MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache;
+
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
@@ -85,20 +81,19 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     @VisibleForTesting
     BookkeeperSchemaStorage(PulsarService pulsar) {
         this.pulsar = pulsar;
-        this.localZkCache = pulsar.getLocalZkCache();
-        this.zooKeeper = localZkCache.getZooKeeper();
+        this.store = pulsar.getLocalMetadataStore();
         this.config = pulsar.getConfiguration();
-    }
+        this.locatorEntryCache = store.getMetadataCache(new MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
+            @Override
+            public byte[] serialize(SchemaStorageFormat.SchemaLocator value) {
+                return value.toByteArray();
+            }
 
-    @VisibleForTesting
-    public void init() throws KeeperException, InterruptedException {
-        try {
-            if (zooKeeper.exists(SchemaPath, false) == null) {
-                zooKeeper.create(SchemaPath, new byte[]{}, Acl, CreateMode.PERSISTENT);
+            @Override
+            public SchemaStorageFormat.SchemaLocator deserialize(byte[] content) throws IOException {
+                return SchemaStorageFormat.SchemaLocator.parseFrom(content);
             }
-        } catch (KeeperException.NodeExistsException error) {
-            // race on startup, ignore.
-        }
+        });
     }
 
     @Override
@@ -158,10 +153,6 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         return getSchemaLocator(getSchemaPath(key));
     }
 
-    public void clearLocatorCache(String key) {
-        localZkCache.invalidate(getSchemaPath(key));
-    }
-
     public List<Long> getSchemaLedgerList(String key) throws IOException {
         Optional<LocatorEntry> locatorEntry = null;
         try {
@@ -302,7 +293,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                                     updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
                                             .thenAccept(future::complete)
                                             .exceptionally(ex -> {
-                                                if (ex.getCause() instanceof KeeperException.BadVersionException) {
+                                                if (ex.getCause() instanceof BadVersionException) {
                                                     // There was a race condition on the schema creation.
                                                     // Since it has now been created,
                                                     // retry the whole operation so that we have a chance to
@@ -328,8 +319,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 createNewSchema(schemaId, data, hash)
                         .thenAccept(future::complete)
                         .exceptionally(ex -> {
-                            if (ex.getCause() instanceof NodeExistsException
-                                    || ex.getCause() instanceof KeeperException.BadVersionException) {
+                            if (ex.getCause() instanceof AlreadyExistsException
+                                    || ex.getCause() instanceof BadVersionException) {
                                 // There was a race condition on the schema creation. Since it has now been created,
                                 // retry the whole operation so that we have a chance to recover without bubbling error
                                 // back to producer/consumer
@@ -411,15 +402,13 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                         });
                         FutureUtil.waitForAll(deleteFutures).whenComplete((v, e) -> {
                             final String path = getSchemaPath(schemaId);
-                            ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, path, -1, (rc, path1, ctx) -> {
-                                if (rc != Code.OK.intValue()) {
-                                    future.completeExceptionally(KeeperException.create(Code.get(rc)));
-                                } else {
-                                    clearLocatorCache(getSchemaPath(schemaId));
-                                    future.complete(version);
-                                }
-                            }, path);
-
+                            store.delete(path, Optional.empty())
+                                    .thenRun(() -> {
+                                        future.complete(version);
+                                    }).exceptionally(ex1 -> {
+                                future.completeExceptionally(ex1);
+                                return null;
+                            });
                         });
                     }
                 });
@@ -468,7 +457,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 .setInfo(info)
                 .addAllIndex(
                         concat(locator.getIndexList(), newArrayList(info))
-                ).build(), locatorEntry.zkZnodeVersion
+                ).build(), locatorEntry.version
         ).thenApply(ignore -> nextVersion);
     }
 
@@ -518,42 +507,21 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
     @NotNull
     private CompletableFuture<Void> updateSchemaLocator(String id,
-                                                        SchemaStorageFormat.SchemaLocator schema, int version) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        zooKeeper.setData(id, schema.toByteArray(), version, (rc, path, ctx, stat) -> {
-            Code code = Code.get(rc);
-            if (code != Code.OK) {
-                future.completeExceptionally(KeeperException.create(code));
-            } else {
-                future.complete(null);
-            }
-        }, null);
-        return future;
+                                                        SchemaStorageFormat.SchemaLocator schema, long version) {
+        return store.put(id, schema.toByteArray(), Optional.of(version)).thenApply(__ -> null);
     }
 
     @NotNull
     private CompletableFuture<LocatorEntry> createSchemaLocator(String id, SchemaStorageFormat.SchemaLocator locator) {
-        CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
-
-        ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, id, locator.toByteArray(), Acl,
-                CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
-                    Code code = Code.get(rc);
-                    if (code != Code.OK) {
-                        future.completeExceptionally(KeeperException.create(code));
-                    } else {
-                        // Newly created z-node will have version 0
-                        future.complete(new LocatorEntry(locator, 0));
-                    }
-                }, null);
-
-        return future;
+        return store.put(id, locator.toByteArray(), Optional.of(-1L))
+                .thenApply(stat -> new LocatorEntry(locator, stat.getVersion()));
     }
 
     @NotNull
     private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String schema) {
-        return localZkCache.getEntryAsync(schema, new SchemaLocatorDeserializer()).thenApply(optional ->
-            optional.map(entry -> new LocatorEntry(entry.getKey(), entry.getValue().getVersion()))
-        );
+        return locatorEntryCache.getWithStats(schema)
+                .thenApply(o ->
+                        o.map(r -> new LocatorEntry(r.getValue(), r.getStat().getVersion())));
     }
 
     @NotNull
@@ -692,20 +660,13 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         }
     }
 
-    static class SchemaLocatorDeserializer implements ZooKeeperCache.Deserializer<SchemaStorageFormat.SchemaLocator> {
-        @Override
-        public SchemaStorageFormat.SchemaLocator deserialize(String key, byte[] content) throws Exception {
-            return SchemaStorageFormat.SchemaLocator.parseFrom(content);
-        }
-    }
-
     static class LocatorEntry {
         final SchemaStorageFormat.SchemaLocator locator;
-        final Integer zkZnodeVersion;
+        final long version;
 
-        LocatorEntry(SchemaStorageFormat.SchemaLocator locator, Integer zkZnodeVersion) {
+        LocatorEntry(SchemaStorageFormat.SchemaLocator locator, long version) {
             this.locator = locator;
-            this.zkZnodeVersion = zkZnodeVersion;
+            this.version = version;
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
index 8304ed1..f3bf010 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
@@ -26,9 +26,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
     @Override
     @NotNull
-    public SchemaStorage create(PulsarService pulsar) throws Exception {
-        BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar);
-        service.init();
-        return service;
+    public SchemaStorage create(PulsarService pulsar) {
+        return new BookkeeperSchemaStorage(pulsar);
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
index be16ec9..6d89fef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
-import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.testng.annotations.Test;
 
 import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.bkException;
@@ -61,7 +61,7 @@ public class BookkeeperSchemaStorageTest {
         byte[] versionBytesPost240 = bbPost240.array();
 
         PulsarService mockPulsarService = mock(PulsarService.class);
-        when(mockPulsarService.getLocalZkCache()).thenReturn(mock(LocalZooKeeperCache.class));
+        when(mockPulsarService.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
         BookkeeperSchemaStorage schemaStorage = new BookkeeperSchemaStorage(mockPulsarService);
         assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPre240));
         assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPost240));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 328db87..931a7f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -82,7 +82,6 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
         conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory");
         super.internalSetup();
         BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
-        storage.init();
         storage.start();
         Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
         checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());