You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/06 09:11:48 UTC

[pulsar] branch branch-2.11 updated: [fix][broker] Fix duplicated schemas creation (#18701)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new b3a8cec595f [fix][broker] Fix duplicated schemas creation (#18701)
b3a8cec595f is described below

commit b3a8cec595f0519bf33cb0f008d5baef39cbf7be
Author: Penghui Li <pe...@apache.org>
AuthorDate: Tue Dec 6 16:16:29 2022 +0800

    [fix][broker] Fix duplicated schemas creation (#18701)
    
    (cherry picked from commit 390b1413624e6154faf50d4a083d7d6efc4d0265)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 164 +++++++++++----------
 .../service/schema/SchemaRegistryServiceImpl.java  | 145 ++++++++++--------
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  55 +++++++
 .../common/protocol/schema/SchemaStorage.java      |  16 ++
 4 files changed, 243 insertions(+), 137 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 4451cab7c92..9094f94eb80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -41,13 +41,16 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
@@ -115,6 +118,44 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         return putSchema(key, value, hash).thenApply(LongSchemaVersion::new);
     }
 
+    @Override
+    public CompletableFuture<SchemaVersion> put(String key,
+            Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
+                    CompletableFuture<Pair<byte[], byte[]>>> fn) {
+        CompletableFuture<SchemaVersion> promise = new CompletableFuture<>();
+        put(key, fn, promise);
+        return promise;
+    }
+
+    private void put(String key,
+             Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
+             CompletableFuture<Pair<byte[], byte[]>>> fn,
+             CompletableFuture<SchemaVersion> promise) {
+        CompletableFuture<Pair<Optional<LocatorEntry>, List<CompletableFuture<StoredSchema>>>> schemasWithLocator =
+                getAllWithLocator(key);
+        schemasWithLocator.thenCompose(pair ->
+                fn.apply(completedFuture(pair.getRight())).thenCompose(p -> {
+                    // The schema is existed
+                    if (p == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return putSchema(key, p.getLeft(), p.getRight(), pair.getLeft());
+                }).thenApply(version -> {
+                    return version != null ? new LongSchemaVersion(version) : null;
+                })).whenComplete((v, ex) -> {
+                    if (ex == null) {
+                        promise.complete(v);
+                    } else {
+                        Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
+                            put(key, fn, promise);
+                        } else {
+                            promise.completeExceptionally(ex);
+                        }
+                    }
+        });
+    }
+
     @Override
     public CompletableFuture<StoredSchema> get(String key, SchemaVersion version) {
         if (version == SchemaVersion.Latest) {
@@ -127,30 +168,32 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
     @Override
     public CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key) {
-        CompletableFuture<List<CompletableFuture<StoredSchema>>> result = new CompletableFuture<>();
-        getLocator(key).thenAccept(locator -> {
+        return getAllWithLocator(key).thenApply(Pair::getRight);
+    }
+
+    private CompletableFuture<Pair<Optional<LocatorEntry>, List<CompletableFuture<StoredSchema>>>> getAllWithLocator(
+            String key) {
+        return getLocator(key).thenApply(locator -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Get all schemas - locator: {}", key, locator);
             }
 
-            if (!locator.isPresent()) {
-                result.complete(Collections.emptyList());
-                return;
+            if (locator.isEmpty()) {
+                return Pair.of(locator, Collections.emptyList());
             }
 
             SchemaStorageFormat.SchemaLocator schemaLocator = locator.get().locator;
             List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
             schemaLocator.getIndexList().forEach(indexEntry -> list.add(readSchemaEntry(indexEntry.getPosition())
-                .thenApply(entry -> new StoredSchema
-                    (
-                        entry.getSchemaData().toByteArray(),
-                        new LongSchemaVersion(indexEntry.getVersion())
+                    .thenApply(entry -> new StoredSchema
+                            (
+                                    entry.getSchemaData().toByteArray(),
+                                    new LongSchemaVersion(indexEntry.getVersion())
+                            )
                     )
-                )
             ));
-            result.complete(list);
+            return Pair.of(locator, list);
         });
-        return result;
     }
 
     CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
@@ -283,72 +326,29 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
     @NotNull
     private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] hash) {
-        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
-
-            if (optLocatorEntry.isPresent()) {
+        return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry ->
+                putSchema(schemaId, data, hash, optLocatorEntry));
+    }
 
-                SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
+    private CompletableFuture<Long> putSchema(String schemaId, byte[] data, byte[] hash,
+                                              Optional<LocatorEntry> optLocatorEntry) {
+        if (optLocatorEntry.isPresent()) {
 
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
-                }
+            SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
 
-                //don't check the schema whether already exist
-                return readSchemaEntry(locator.getIndexList().get(0).getPosition())
-                        .thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId,
-                                locator.getIndexList(), data).thenCompose(
-                                position -> {
-                                    CompletableFuture<Long> future = new CompletableFuture<>();
-                                    updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
-                                            .thenAccept(future::complete)
-                                            .exceptionally(ex -> {
-                                                if (ex.getCause() instanceof BadVersionException) {
-                                                    // There was a race condition on the schema creation.
-                                                    // Since it has now been created,
-                                                    // retry the whole operation so that we have a chance to
-                                                    // recover without bubbling error
-                                                    putSchema(schemaId, data, hash)
-                                                            .thenAccept(future::complete)
-                                                            .exceptionally(ex2 -> {
-                                                                future.completeExceptionally(ex2);
-                                                                return null;
-                                                            });
-                                                } else {
-                                            // For other errors, just fail the operation
-                                            future.completeExceptionally(ex);
-                                        }
-                                        return null;
-                                    });
-                            return future;
-                        })
-                );
-            } else {
-                // No schema was defined yet
-                CompletableFuture<Long> future = new CompletableFuture<>();
-                createNewSchema(schemaId, data, hash)
-                        .thenAccept(future::complete)
-                        .exceptionally(ex -> {
-                            if (ex.getCause() instanceof AlreadyExistsException
-                                    || ex.getCause() instanceof BadVersionException) {
-                                // There was a race condition on the schema creation. Since it has now been created,
-                                // retry the whole operation so that we have a chance to recover without bubbling error
-                                // back to producer/consumer
-                                putSchema(schemaId, data, hash)
-                                        .thenAccept(future::complete)
-                                        .exceptionally(ex2 -> {
-                                            future.completeExceptionally(ex2);
-                                            return null;
-                                        });
-                            } else {
-                                // For other errors, just fail the operation
-                                future.completeExceptionally(ex);
-                            }
-                            return null;
-                        });
-
-                return future;
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
             }
-        });
+
+            //don't check the schema whether already exist
+            return readSchemaEntry(locator.getIndexList().get(0).getPosition())
+                    .thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId,
+                            locator.getIndexList(), data).thenCompose(
+                            position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash))
+                    );
+        } else {
+            return createNewSchema(schemaId, data, hash);
+        }
     }
 
     private CompletableFuture<Long> createNewSchema(String schemaId, byte[] data, byte[] hash) {
@@ -481,7 +481,21 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 .addAllIndex(
                         concat(locator.getIndexList(), newArrayList(info))
                 ).build(), locatorEntry.version
-        ).thenApply(ignore -> nextVersion);
+        ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            log.warn("[{}] Failed to update schema locator with position {}", schemaId, position, cause);
+            if (cause instanceof AlreadyExistsException || cause instanceof BadVersionException) {
+                bookKeeper.asyncDeleteLedger(position.getLedgerId(), new AsyncCallback.DeleteCallback() {
+                    @Override
+                    public void deleteComplete(int rc, Object ctx) {
+                        if (rc != BKException.Code.OK) {
+                            log.warn("[{}] Failed to delete ledger {} after updating schema locator failed, rc: {}",
+                                    schemaId, position.getLedgerId(), rc);
+                        }
+                    }
+                }, null);
+            }
+        });
     }
 
     @NotNull
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 5fbe91e3bcd..7c0121a5bb6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -45,6 +45,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
@@ -153,21 +155,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
         long start = this.clock.millis();
 
         return schemaStorage.getAll(schemaId)
-                .thenApply(schemas -> {
-                    List<CompletableFuture<SchemaAndMetadata>> futures = schemas.stream()
-                            .map(future -> future.thenCompose(stored ->
-                                    Functions.bytesToSchemaInfo(stored.data)
-                                            .thenApply(Functions::schemaInfoToSchema)
-                                            .thenApply(schema ->
-                                                    new SchemaAndMetadata(schemaId, schema, stored.version)
-                                            )
-                            ))
-                            .collect(Collectors.toList());
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] {} schemas is found", schemaId, futures.size());
-                    }
-                    return futures;
-                })
+                .thenCompose(schemas -> convertToSchemaAndMetadata(schemaId, schemas))
                 .whenComplete((v, t) -> {
                     if (t != null) {
                         this.stats.recordGetFailed(schemaId);
@@ -177,60 +165,89 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
                 });
     }
 
+    private CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> convertToSchemaAndMetadata(String schemaId,
+             List<CompletableFuture<StoredSchema>> schemas) {
+        List<CompletableFuture<SchemaAndMetadata>> list = schemas.stream()
+                .map(future -> future.thenCompose(stored ->
+                        Functions.bytesToSchemaInfo(stored.data)
+                                .thenApply(Functions::schemaInfoToSchema)
+                                .thenApply(schema ->
+                                        new SchemaAndMetadata(schemaId, schema, stored.version)
+                                )
+                ))
+                .collect(Collectors.toList());
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] {} schemas is found", schemaId, list.size());
+        }
+        return CompletableFuture.completedFuture(list);
+    }
+
     @Override
     @NotNull
     public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema,
                                                               SchemaCompatibilityStrategy strategy) {
-        return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
-                getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
-            if (schemaVersion != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Schema is already exists", schemaId);
-                }
-                return CompletableFuture.completedFuture(schemaVersion);
-            }
-            CompletableFuture<Void> checkCompatibilityFuture = new CompletableFuture<>();
-            if (schemaAndMetadataList.size() != 0) {
-                if (isTransitiveStrategy(strategy)) {
-                    checkCompatibilityFuture =
-                            checkCompatibilityWithAll(schemaId, schema, strategy, schemaAndMetadataList);
-                } else {
-                    checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy);
-                }
-            } else {
-                checkCompatibilityFuture.complete(null);
-            }
-            return checkCompatibilityFuture.thenCompose(v -> {
-                byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
-                SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
-                        .setType(Functions.convertFromDomainType(schema.getType()))
-                        .setSchema(ByteString.copyFrom(schema.getData()))
-                        .setSchemaId(schemaId)
-                        .setUser(schema.getUser())
-                        .setDeleted(false)
-                        .setTimestamp(clock.millis())
-                        .addAllProps(toPairs(schema.getProps()))
-                        .build();
-
-                long start = this.clock.millis();
-
-                return schemaStorage
-                        .put(schemaId, info.toByteArray(), context)
-                        .whenComplete((__, t) -> {
-                            if (t != null) {
-                                log.error("[{}] Put schema failed", schemaId);
-                                this.stats.recordPutFailed(schemaId);
+        MutableLong start = new MutableLong(0);
+        CompletableFuture<SchemaVersion> promise = new CompletableFuture<>();
+        schemaStorage.put(schemaId,
+            schemasFuture -> schemasFuture
+                .thenCompose(schemaFutureList -> trimDeletedSchemaAndGetList(schemaId,
+                        convertToSchemaAndMetadata(schemaId, schemaFutureList)))
+                .thenCompose(schemaAndMetadataList -> getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
+                    .thenCompose(schemaVersion -> {
+                        if (schemaVersion != null) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] Schema is already exists", schemaId);
+                            }
+                            promise.complete(schemaVersion);
+                            return CompletableFuture.completedFuture(null);
+                        }
+                        CompletableFuture<Void> checkCompatibilityFuture = new CompletableFuture<>();
+                        if (schemaAndMetadataList.size() != 0) {
+                            if (isTransitiveStrategy(strategy)) {
+                                checkCompatibilityFuture =
+                                        checkCompatibilityWithAll(schemaId, schema, strategy, schemaAndMetadataList);
                             } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Put schema finished", schemaId);
-                                }
-                                this.stats.recordPutLatency(schemaId, this.clock.millis() - start);
+                                checkCompatibilityFuture = checkCompatibilityWithLatest(schemaId, schema, strategy);
                             }
+                        } else {
+                            checkCompatibilityFuture.complete(null);
+                        }
+                        return checkCompatibilityFuture.thenCompose(v -> {
+                            byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
+                            SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
+                                    .setType(Functions.convertFromDomainType(schema.getType()))
+                                    .setSchema(ByteString.copyFrom(schema.getData()))
+                                    .setSchemaId(schemaId)
+                                    .setUser(schema.getUser())
+                                    .setDeleted(false)
+                                    .setTimestamp(clock.millis())
+                                    .addAllProps(toPairs(schema.getProps()))
+                                    .build();
+
+                            start.setValue(this.clock.millis());
+                            return CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context));
                         });
-
+                }))).whenComplete((v, ex) -> {
+                    if (ex != null) {
+                        log.error("[{}] Put schema failed", schemaId, ex);
+                        if (start.getValue() != 0) {
+                            this.stats.recordPutFailed(schemaId);
+                        }
+                        promise.completeExceptionally(ex);
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Put schema finished", schemaId);
+                        }
+                        // The schema storage will return null schema version if no schema is persisted to the storage
+                        if (v != null) {
+                            promise.complete(v);
+                            if (start.getValue() != 0) {
+                                this.stats.recordPutLatency(schemaId, this.clock.millis() - start.getValue());
+                            }
+                        }
+                    }
             });
-
-        }));
+        return promise;
     }
 
     @Override
@@ -525,9 +542,13 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     }
 
     public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
+        CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList = getAllSchemas(schemaId);
+        return trimDeletedSchemaAndGetList(schemaId, schemaFutureList);
+    }
 
+    private CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId,
+                CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList) {
         CompletableFuture<List<SchemaAndMetadata>> schemaResult = new CompletableFuture<>();
-        CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList = getAllSchemas(schemaId);
         schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> {
             List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : schemaList;
             if (ex != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 0620326e94d..d3604c785b8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -34,12 +34,16 @@ import com.google.common.collect.Sets;
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -1269,6 +1273,57 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         assertThrows(SchemaSerializationException.class, message2::getValue);
     }
 
+    @Test
+    public void testCreateSchemaInParallel() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String topic = getTopicName(ns, "testCreateSchemaInParallel");
+        ExecutorService executor = Executors.newFixedThreadPool(16);
+        List<CompletableFuture<Producer<Schemas.PersonOne>>> producers = new ArrayList<>(16);
+        CountDownLatch latch = new CountDownLatch(16);
+        for (int i = 0; i < 16; i++) {
+            executor.execute(() -> {
+                producers.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                        .topic(topic).createAsync());
+                latch.countDown();
+            });
+        }
+        latch.await();
+        FutureUtil.waitForAll(producers).join();
+        assertEquals(admin.schemas().getAllSchemas(topic).size(), 1);
+        producers.clear();
+
+        List<CompletableFuture<Producer<Schemas.PersonThree>>> producers2 = new ArrayList<>(16);
+        CountDownLatch latch2 = new CountDownLatch(16);
+        for (int i = 0; i < 16; i++) {
+            executor.execute(() -> {
+                producers2.add(pulsarClient.newProducer(Schema.AVRO(Schemas.PersonThree.class))
+                        .topic(topic).createAsync());
+                latch2.countDown();
+            });
+        }
+        latch2.await();
+        FutureUtil.waitForAll(producers2).join();
+        assertEquals(admin.schemas().getAllSchemas(topic).size(), 2);
+        producers.forEach(p -> {
+            try {
+                p.join().close();
+            } catch (Exception ignore) {
+            }
+        });
+        producers2.forEach(p -> {
+            try {
+                p.join().close();
+            } catch (Exception ignore) {
+            }
+        });
+        producers.clear();
+        producers2.clear();
+        executor.shutdownNow();
+    }
+
     @EqualsAndHashCode
     static class User implements Serializable {
         private String name;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
index c34154bc827..d93672a58db 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.common.protocol.schema;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * Schema storage.
@@ -28,6 +30,20 @@ public interface SchemaStorage {
 
     CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
 
+    /**
+     * Put the schema to the schema storage.
+     *
+     * @param key The schema ID
+     * @param fn The function to calculate the value and hash that need to put to the schema storage
+     *           The input of the function is all the existing schemas that used to do the schemas compatibility check
+     * @return The schema version of the stored schema
+     */
+    default CompletableFuture<SchemaVersion> put(String key,
+             Function<CompletableFuture<List<CompletableFuture<StoredSchema>>>,
+                     CompletableFuture<Pair<byte[], byte[]>>> fn) {
+        return fn.apply(getAll(key)).thenCompose(pair -> put(key, pair.getLeft(), pair.getRight()));
+    }
+
     CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
 
     CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key);