You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/06 09:11:48 UTC
[pulsar] branch branch-2.11 updated: [fix][broker] Fix duplicated schemas creation (#18701)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new b3a8cec595f [fix][broker] Fix duplicated schemas creation (#18701)
b3a8cec595f is described below
commit b3a8cec595f0519bf33cb0f008d5baef39cbf7be
Author: Penghui Li <pe...@apache.org>
AuthorDate: Tue Dec 6 16:16:29 2022 +0800
[fix][broker] Fix duplicated schemas creation (#18701)
(cherry picked from commit 390b1413624e6154faf50d4a083d7d6efc4d0265)
---
.../service/schema/BookkeeperSchemaStorage.java | 164 +++++++++++----------
.../service/schema/SchemaRegistryServiceImpl.java | 145 ++++++++++--------
.../java/org/apache/pulsar/schema/SchemaTest.java | 55 +++++++
.../common/protocol/schema/SchemaStorage.java | 16 ++
4 files changed, 243 insertions(+), 137 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 4451cab7c92..9094f94eb80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -41,13 +41,16 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
@@ -115,6 +118,44 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
return putSchema(key, value, hash).thenApply(LongSchemaVersion::new);
}
+ @Override
+ public CompletableFuture<SchemaVersion> put(String key,
+ Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
+ CompletableFuture<Pair<byte[], byte[]>>> fn) {
+ CompletableFuture<SchemaVersion> promise = new CompletableFuture<>();
+ put(key, fn, promise);
+ return promise;
+ }
+
+ private void put(String key,
+ Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
+ CompletableFuture<Pair<byte[], byte[]>>> fn,
+ CompletableFuture<SchemaVersion> promise) {
+ CompletableFuture<Pair<Optional<LocatorEntry>, List<CompletableFuture<StoredSchema>>>> schemasWithLocator =
+ getAllWithLocator(key);
+ schemasWithLocator.thenCompose(pair ->
+ fn.apply(completedFuture(pair.getRight())).thenCompose(p -> {
+ // The schema is existed
+ if (p == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return putSchema(key, p.getLeft(), p.getRight(), pair.getLeft());
+ }).thenApply(version -> {
+ return version != null ? new LongSchemaVersion(version) : null;
+ })).whenComplete((v, ex) -> {
+ if (ex == null) {
+ promise.complete(v);
+ } else {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
+ put(key, fn, promise);
+ } else {
+ promise.completeExceptionally(ex);
+ }
+ }
+ });
+ }
+
@Override
public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {
if (version == SchemaVersion.Latest) {
@@ -127,30 +168,32 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
@Override
public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key) {
- CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>();
- getLocator(key).thenAccept(locator -> {
+ return getAllWithLocator(key).thenApply(Pair::getRight);
+ }
+
+ private CompletableFuture<Pair<Optional<LocatorEntry>, List<CompletableFuture<StoredSchema>>>> getAllWithLocator(
+ String key) {
+ return getLocator(key).thenApply(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get all schemas - locator: {}", key, locator);
}
- if (!locator.isPresent()) {
- result.complete(Collections.emptyList());
- return;
+ if (locator.isEmpty()) {
+ return Pair.of(locator, Collections.emptyList());
}
SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator;
List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
schemaLocator.getIndexList().forEach(indexEntry -> list.add(readSchemaEntry(indexEntry.getPosition())
- .thenApply(entry -> new StoredSchema
- (
- entry.getSchemaData().toByteArray(),
- new LongSchemaVersion(indexEntry.getVersion())
+ .thenApply(entry -> new StoredSchema
+ (
+ entry.getSchemaData().toByteArray(),
+ new LongSchemaVersion(indexEntry.getVersion())
+ )
)
- )
));
- result.complete(list);
+ return Pair.of(locator, list);
});
- return result;
}
CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
@@ -283,72 +326,29 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
@NotNull
private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] hash) {
- return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
-
- if (optLocatorEntry.isPresent()) {
+ return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry ->
+ putSchema(schemaId, data, hash, optLocatorEntry));
+ }
- SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
+ private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] hash,
+ Optional<LocatorEntry> optLocatorEntry) {
+ if (optLocatorEntry.isPresent()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
- }
+ SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
- //don't check the schema whether already exist
- return readSchemaEntry(locator.getIndexList().get(0).getPosition())
- .thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId,
- locator.getIndexList(), data).thenCompose(
- position -> {
- CompletableFuture<Long> future = new CompletableFuture<>();
- updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
- .thenAccept(future::complete)
- .exceptionally(ex -> {
- if (ex.getCause() instanceof BadVersionException) {
- // There was a race condition on the schema creation.
- // Since it has now been created,
- // retry the whole operation so that we have a chance to
- // recover without bubbling error
- putSchema(schemaId, data, hash)
- .thenAccept(future::complete)
- .exceptionally(ex2 -> {
- future.completeExceptionally(ex2);
- return null;
- });
- } else {
- // For other errors, just fail the operation
- future.completeExceptionally(ex);
- }
- return null;
- });
- return future;
- })
- );
- } else {
- // No schema was defined yet
- CompletableFuture<Long> future = new CompletableFuture<>();
- createNewSchema(schemaId, data, hash)
- .thenAccept(future::complete)
- .exceptionally(ex -> {
- if (ex.getCause() instanceof AlreadyExistsException
- || ex.getCause() instanceof BadVersionException) {
- // There was a race condition on the schema creation. Since it has now been created,
- // retry the whole operation so that we have a chance to recover without bubbling error
- // back to producer/consumer
- putSchema(schemaId, data, hash)
- .thenAccept(future::complete)
- .exceptionally(ex2 -> {
- future.completeExceptionally(ex2);
- return null;
- });
- } else {
- // For other errors, just fail the operation
- future.completeExceptionally(ex);
- }
- return null;
- });
-
- return future;
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
}
- });
+
+ //don't check the schema whether already exist
+ return readSchemaEntry(locator.getIndexList().get(0).getPosition())
+ .thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId,
+ locator.getIndexList(), data).thenCompose(
+ position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash))
+ );
+ } else {
+ return createNewSchema(schemaId, data, hash);
+ }
}
private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, byte[] hash) {
@@ -481,7 +481,21 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
.addAllIndex(
concat(locator.getIndexList(), newArrayList(info))
).build(), locatorEntry.version
- ).thenApply(ignore -> nextVersion);
+ ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.warn("[{}] Failed to update schema locator with position {}", schemaId, position, cause);
+ if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
+ bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() {
+ @Override
+ public void deleteComplete(int rc, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ log.warn("[{}] Failed to delete ledger {} after updating schema locator failed, rc: {}",
+ schemaId, position.getLedgerId(), rc);
+ }
+ }
+ }, null);
+ }
+ });
}
@NotNull
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 5fbe91e3bcd..7c0121a5bb6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -45,6 +45,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
@@ -153,21 +155,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
long start = this.clock.millis();
return schemaStorage.getAll(schemaId)
- .thenApply(schemas -> {
- List<CompletableFuture<SchemaAndMetadata>> futures = schemas.stream()
- .map(future -> future.thenCompose(stored ->
- Functions.bytesToSchemaInfo(stored.data)
- .thenApply(Functions::schemaInfoToSchema)
- .thenApply(schema ->
- new SchemaAndMetadata(schemaId, schema, stored.version)
- )
- ))
- .collect(Collectors.toList());
- if (log.isDebugEnabled()) {
- log.debug("[{}] {} schemas is found", schemaId, futures.size());
- }
- return futures;
- })
+ .thenCompose(schemas -> convertToSchemaAndMetadata(schemaId, schemas))
.whenComplete((v, t) -> {
if (t != null) {
this.stats.recordGetFailed(schemaId);
@@ -177,60 +165,89 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
});
}
+ private CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> convertToSchemaAndMetadata(String schemaId,
+ List<CompletableFuture<StoredSchema>> schemas) {
+ List<CompletableFuture<SchemaAndMetadata>> list = schemas.stream()
+ .map(future -> future.thenCompose(stored ->
+ Functions.bytesToSchemaInfo(stored.data)
+ .thenApply(Functions::schemaInfoToSchema)
+ .thenApply(schema ->
+ new SchemaAndMetadata(schemaId, schema, stored.version)
+ )
+ ))
+ .collect(Collectors.toList());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] {} schemas is found", schemaId, list.size());
+ }
+ return CompletableFuture.completedFuture(list);
+ }
+
@Override
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
- return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
- getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
- if (schemaVersion != null) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Schema is already exists", schemaId);
- }
- return CompletableFuture.completedFuture(schemaVersion);
- }
- CompletableFuture<Void> checkCompatibilityFuture = new CompletableFuture<>();
- if (schemaAndMetadataList.size() != 0) {
- if (isTransitiveStrategy(strategy)) {
- checkCompatibilityFuture =
- checkCompatibilityWithAll(schemaId, schema, strategy, schemaAndMetadataList);
- } else {
- checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy);
- }
- } else {
- checkCompatibilityFuture.complete(null);
- }
- return checkCompatibilityFuture.thenCompose(v -> {
- byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
- SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
- .setType(Functions.convertFromDomainType(schema.getType()))
- .setSchema(ByteString.copyFrom(schema.getData()))
- .setSchemaId(schemaId)
- .setUser(schema.getUser())
- .setDeleted(false)
- .setTimestamp(clock.millis())
- .addAllProps(toPairs(schema.getProps()))
- .build();
-
- long start = this.clock.millis();
-
- return schemaStorage
- .put(schemaId, info.toByteArray(), context)
- .whenComplete((__, t) -> {
- if (t != null) {
- log.error("[{}] Put schema failed", schemaId);
- this.stats.recordPutFailed(schemaId);
+ MutableLong start = new MutableLong(0);
+ CompletableFuture<SchemaVersion> promise = new CompletableFuture<>();
+ schemaStorage.put(schemaId,
+ schemasFuture -> schemasFuture
+ .thenCompose(schemaFutureList -> trimDeletedSchemaAndGetList(schemaId,
+ convertToSchemaAndMetadata(schemaId, schemaFutureList)))
+ .thenCompose(schemaAndMetadataList -> getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
+ .thenCompose(schemaVersion -> {
+ if (schemaVersion != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Schema is already exists", schemaId);
+ }
+ promise.complete(schemaVersion);
+ return CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture<Void> checkCompatibilityFuture = new CompletableFuture<>();
+ if (schemaAndMetadataList.size() != 0) {
+ if (isTransitiveStrategy(strategy)) {
+ checkCompatibilityFuture =
+ checkCompatibilityWithAll(schemaId, schema, strategy, schemaAndMetadataList);
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Put schema finished", schemaId);
- }
- this.stats.recordPutLatency(schemaId, this.clock.millis() - start);
+ checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy);
}
+ } else {
+ checkCompatibilityFuture.complete(null);
+ }
+ return checkCompatibilityFuture.thenCompose(v -> {
+ byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
+ SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
+ .setType(Functions.convertFromDomainType(schema.getType()))
+ .setSchema(ByteString.copyFrom(schema.getData()))
+ .setSchemaId(schemaId)
+ .setUser(schema.getUser())
+ .setDeleted(false)
+ .setTimestamp(clock.millis())
+ .addAllProps(toPairs(schema.getProps()))
+ .build();
+
+ start.setValue(this.clock.millis());
+ return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context));
});
-
+ }))).whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Put schema failed", schemaId, ex);
+ if (start.getValue() != 0) {
+ this.stats.recordPutFailed(schemaId);
+ }
+ promise.completeExceptionally(ex);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Put schema finished", schemaId);
+ }
+ // The schema storage will return null schema version if no schema is persisted to the storage
+ if (v != null) {
+ promise.complete(v);
+ if (start.getValue() != 0) {
+ this.stats.recordPutLatency(schemaId, this.clock.millis() - start.getValue());
+ }
+ }
+ }
});
-
- }));
+ return promise;
}
@Override
@@ -525,9 +542,13 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
}
public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+ CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList = getAllSchemas(schemaId);
+ return trimDeletedSchemaAndGetList(schemaId, schemaFutureList);
+ }
+ private CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId,
+ CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList) {
CompletableFuture<List<SchemaAndMetadata>> schemaResult = new CompletableFuture<>();
- CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList = getAllSchemas(schemaId);
schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> {
List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : schemaList;
if (ex != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 0620326e94d..d3604c785b8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -34,12 +34,16 @@ import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -1269,6 +1273,57 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
assertThrows(SchemaSerializationException.class, message2::getValue);
}
+ @Test
+ public void testCreateSchemaInParallel() throws Exception {
+ final String namespace = "test-namespace-" + randomName(16);
+ String ns = PUBLIC_TENANT + "/" + namespace;
+ admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+ final String topic = getTopicName(ns, "testCreateSchemaInParallel");
+ ExecutorService executor = Executors.newFixedThreadPool(16);
+ List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new ArrayList<>(16);
+ CountDownLatch latch = new CountDownLatch(16);
+ for (int i = 0; i < 16; i++) {
+ executor.execute(() -> {
+ producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic).createAsync());
+ latch.countDown();
+ });
+ }
+ latch.await();
+ FutureUtil.waitForAll(producers).join();
+ assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+ producers.clear();
+
+ List<CompletableFuture<Producer<Schemas.PersonThree>>> producers2 = new ArrayList<>(16);
+ CountDownLatch latch2 = new CountDownLatch(16);
+ for (int i = 0; i < 16; i++) {
+ executor.execute(() -> {
+ producers2.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonThree.class))
+ .topic(topic).createAsync());
+ latch2.countDown();
+ });
+ }
+ latch2.await();
+ FutureUtil.waitForAll(producers2).join();
+ assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
+ producers.forEach(p -> {
+ try {
+ p.join().close();
+ } catch (Exception ignore) {
+ }
+ });
+ producers2.forEach(p -> {
+ try {
+ p.join().close();
+ } catch (Exception ignore) {
+ }
+ });
+ producers.clear();
+ producers2.clear();
+ executor.shutdownNow();
+ }
+
@EqualsAndHashCode
static class User implements Serializable {
private String name;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
index c34154bc827..d93672a58db 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.common.protocol.schema;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
/**
* Schema storage.
@@ -28,6 +30,20 @@ public interface SchemaStorage {
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
+ /**
+ * Put the schema to the schema storage.
+ *
+ * @param key The schema ID
+ * @param fn The function to calculate the value and hash that need to put to the schema storage
+ * The input of the function is all the existing schemas that used to do the schemas compatibility check
+ * @return The schema version of the stored schema
+ */
+ default CompletableFuture<SchemaVersion> put(String key,
+ Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
+ CompletableFuture<Pair<byte[], byte[]>>> fn) {
+ return fn.apply(getAll(key)).thenCompose(pair -> put(key, pair.getLeft(), pair.getRight()));
+ }
+
CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key);