You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/05/12 21:17:20 UTC
[pulsar] branch master updated: Migrated BookkeeperSchemaStorage to
use MetadataStore (#10545)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 55af397 Migrated BookkeeperSchemaStorage to use MetadataStore (#10545)
55af397 is described below
commit 55af39759d78db90eaa26008ce21f14063b51130
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 12 14:16:37 2021 -0700
Migrated BookkeeperSchemaStorage to use MetadataStore (#10545)
* Migrated BookkeeperSchemaStorage to use MetadataStore
* Fixed test
* Fixed checkstyle
* Fixed tests
---
.../service/schema/BookkeeperSchemaStorage.java | 117 +++++++--------------
.../schema/BookkeeperSchemaStorageFactory.java | 6 +-
.../schema/BookkeeperSchemaStorageTest.java | 4 +-
.../broker/service/schema/SchemaServiceTest.java | 1 -
4 files changed, 43 insertions(+), 85 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 3009236..f47d0f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -25,6 +25,8 @@ import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
+import static org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import static org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -46,7 +48,6 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
@@ -55,14 +56,9 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataSerde;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,12 +66,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
private static final Logger log = LoggerFactory.getLogger(BookkeeperSchemaStorage.class);
private static final String SchemaPath = "/schemas";
- private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private static final byte[] LedgerPassword = "".getBytes();
+ private final MetadataStore store;
private final PulsarService pulsar;
- private final ZooKeeper zooKeeper;
- private final ZooKeeperCache localZkCache;
+ private final MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache;
+
private final ServiceConfiguration config;
private BookKeeper bookKeeper;
@@ -85,20 +81,19 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
@VisibleForTesting
BookkeeperSchemaStorage(PulsarService pulsar) {
this.pulsar = pulsar;
- this.localZkCache = pulsar.getLocalZkCache();
- this.zooKeeper = localZkCache.getZooKeeper();
+ this.store = pulsar.getLocalMetadataStore();
this.config = pulsar.getConfiguration();
- }
+ this.locatorEntryCache = store.getMetadataCache(new MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
+ @Override
+ public byte[] serialize(SchemaStorageFormat.SchemaLocator value) {
+ return value.toByteArray();
+ }
- @VisibleForTesting
- public void init() throws KeeperException, InterruptedException {
- try {
- if (zooKeeper.exists(SchemaPath, false) == null) {
- zooKeeper.create(SchemaPath, new byte[]{}, Acl, CreateMode.PERSISTENT);
+ @Override
+ public SchemaStorageFormat.SchemaLocator deserialize(byte[] content) throws IOException {
+ return SchemaStorageFormat.SchemaLocator.parseFrom(content);
}
- } catch (KeeperException.NodeExistsException error) {
- // race on startup, ignore.
- }
+ });
}
@Override
@@ -158,10 +153,6 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
return getSchemaLocator(getSchemaPath(key));
}
- public void clearLocatorCache(String key) {
- localZkCache.invalidate(getSchemaPath(key));
- }
-
public List<Long> getSchemaLedgerList(String key) throws IOException {
Optional<LocatorEntry> locatorEntry = null;
try {
@@ -302,7 +293,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
- if (ex.getCause() instanceof KeeperException.BadVersionException) {
+ if (ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation.
// Since it has now been created,
// retry the whole operation so that we have a chance to
@@ -328,8 +319,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
createNewSchema(schemaId, data, hash)
.thenAccept(future::complete)
.exceptionally(ex -> {
- if (ex.getCause() instanceof NodeExistsException
- || ex.getCause() instanceof KeeperException.BadVersionException) {
+ if (ex.getCause() instanceof AlreadyExistsException
+ || ex.getCause() instanceof BadVersionException) {
// There was a race condition on the schema creation. Since it has now been created,
// retry the whole operation so that we have a chance to recover without bubbling error
// back to producer/consumer
@@ -411,15 +402,13 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
});
FutureUtil.waitForAll(deleteFutures).whenComplete((v, e) -> {
final String path = getSchemaPath(schemaId);
- ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, path, -1, (rc, path1, ctx) -> {
- if (rc != Code.OK.intValue()) {
- future.completeExceptionally(KeeperException.create(Code.get(rc)));
- } else {
- clearLocatorCache(getSchemaPath(schemaId));
- future.complete(version);
- }
- }, path);
-
+ store.delete(path, Optional.empty())
+ .thenRun(() -> {
+ future.complete(version);
+ }).exceptionally(ex1 -> {
+ future.completeExceptionally(ex1);
+ return null;
+ });
});
}
});
@@ -468,7 +457,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
.setInfo(info)
.addAllIndex(
concat(locator.getIndexList(), newArrayList(info))
- ).build(), locatorEntry.zkZnodeVersion
+ ).build(), locatorEntry.version
).thenApply(ignore -> nextVersion);
}
@@ -518,42 +507,21 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
@NotNull
private CompletableFuture<Void> updateSchemaLocator(String id,
- SchemaStorageFormat.SchemaLocator schema, int version) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- zooKeeper.setData(id, schema.toByteArray(), version, (rc, path, ctx, stat) -> {
- Code code = Code.get(rc);
- if (code != Code.OK) {
- future.completeExceptionally(KeeperException.create(code));
- } else {
- future.complete(null);
- }
- }, null);
- return future;
+ SchemaStorageFormat.SchemaLocator schema, long version) {
+ return store.put(id, schema.toByteArray(), Optional.of(version)).thenApply(__ -> null);
}
@NotNull
private CompletableFuture<LocatorEntry> createSchemaLocator(String id, SchemaStorageFormat.SchemaLocator locator) {
- CompletableFuture<LocatorEntry> future = new CompletableFuture<>();
-
- ZkUtils.asyncCreateFullPathOptimistic(zooKeeper, id, locator.toByteArray(), Acl,
- CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
- Code code = Code.get(rc);
- if (code != Code.OK) {
- future.completeExceptionally(KeeperException.create(code));
- } else {
- // Newly created z-node will have version 0
- future.complete(new LocatorEntry(locator, 0));
- }
- }, null);
-
- return future;
+ return store.put(id, locator.toByteArray(), Optional.of(-1L))
+ .thenApply(stat -> new LocatorEntry(locator, stat.getVersion()));
}
@NotNull
private CompletableFuture<Optional<LocatorEntry>> getSchemaLocator(String schema) {
- return localZkCache.getEntryAsync(schema, new SchemaLocatorDeserializer()).thenApply(optional ->
- optional.map(entry -> new LocatorEntry(entry.getKey(), entry.getValue().getVersion()))
- );
+ return locatorEntryCache.getWithStats(schema)
+ .thenApply(o ->
+ o.map(r -> new LocatorEntry(r.getValue(), r.getStat().getVersion())));
}
@NotNull
@@ -692,20 +660,13 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
}
}
- static class SchemaLocatorDeserializer implements ZooKeeperCache.Deserializer<SchemaStorageFormat.SchemaLocator> {
- @Override
- public SchemaStorageFormat.SchemaLocator deserialize(String key, byte[] content) throws Exception {
- return SchemaStorageFormat.SchemaLocator.parseFrom(content);
- }
- }
-
static class LocatorEntry {
final SchemaStorageFormat.SchemaLocator locator;
- final Integer zkZnodeVersion;
+ final long version;
- LocatorEntry(SchemaStorageFormat.SchemaLocator locator, Integer zkZnodeVersion) {
+ LocatorEntry(SchemaStorageFormat.SchemaLocator locator, long version) {
this.locator = locator;
- this.zkZnodeVersion = zkZnodeVersion;
+ this.version = version;
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
index 8304ed1..f3bf010 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java
@@ -26,9 +26,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaStorage;
public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
@Override
@NotNull
- public SchemaStorage create(PulsarService pulsar) throws Exception {
- BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar);
- service.init();
- return service;
+ public SchemaStorage create(PulsarService pulsar) {
+ return new BookkeeperSchemaStorage(pulsar);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
index be16ec9..6d89fef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageTest.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.schema.LongSchemaVersion;
-import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.bkException;
@@ -61,7 +61,7 @@ public class BookkeeperSchemaStorageTest {
byte[] versionBytesPost240 = bbPost240.array();
PulsarService mockPulsarService = mock(PulsarService.class);
- when(mockPulsarService.getLocalZkCache()).thenReturn(mock(LocalZooKeeperCache.class));
+ when(mockPulsarService.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
BookkeeperSchemaStorage schemaStorage = new BookkeeperSchemaStorage(mockPulsarService);
assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPre240));
assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPost240));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 328db87..931a7f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -82,7 +82,6 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory");
super.internalSetup();
BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
- storage.init();
storage.start();
Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());