You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 06:59:04 UTC

[pulsar] 07/07: Fix race condition in concurrent schema deletion (#11606)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0873c09441fe49824a4890691f22c6690de428aa
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Tue Aug 17 13:05:31 2021 +0800

    Fix race condition in concurrent schema deletion (#11606)
    
    This PR fixes #11605
    
    ### Motivation
    
    Concurrently deleting topics with the same schema may cause race condition in broker side. If we do not handle these scenarios correctly we will get unexpected exceptions in broker logs.
    
    ### Modifications
    
    1. Add existence checks before schema deletion in `AbstractTopic#deleteSchema`.
    2. Add existence checks before actually performing schema storage deletion in `BookkeeperSchemaStorage#deleteSchema`.
    3. Ignore `NoNodeException` in `BookkeeperSchemaStorage#deleteSchema`.
    
    (cherry picked from commit 43ded5927fb78add69196b46c1edc36bde77af0e)
---
 .../pulsar/broker/service/AbstractTopic.java       |  3 +-
 .../service/schema/BookkeeperSchemaStorage.java    | 52 ++++++++++++++++++----
 .../broker/service/PersistentTopicE2ETest.java     | 27 +++++++++++
 3 files changed, 73 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 39b0650..c8b1079 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep
 import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -357,7 +358,7 @@ public abstract class AbstractTopic implements Topic {
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
-        return schemaRegistryService.getSchema(id)
+        return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
                 .thenCompose(schema -> {
                     if (schema != null) {
                         // It's different from `SchemasResource.deleteSchema`
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index c82e30b..0d05da9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -58,6 +59,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -174,7 +176,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
     @Override
     public CompletableFuture<SchemaVersion> delete(String key, boolean forcefully) {
-        return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new);
+        return deleteSchema(key, forcefully).thenApply(version -> {
+            if (version == null) {
+                return null;
+            }
+            return new LongSchemaVersion(version);
+        });
     }
 
     @Override
@@ -369,10 +376,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     }
 
     @NotNull
-    private CompletableFuture<Long> deleteSchema(String schemaId, boolean forceFully) {
-        return (forceFully ? CompletableFuture.completedFuture(null) : getSchema(schemaId))
-                .thenCompose(schemaAndVersion -> {
-            if (!forceFully && isNull(schemaAndVersion)) {
+    private CompletableFuture<Long> deleteSchema(String schemaId, boolean forcefully) {
+        return (forcefully ? CompletableFuture.completedFuture(null)
+                : ignoreUnrecoverableBKException(getSchema(schemaId))).thenCompose(schemaAndVersion -> {
+            if (!forcefully && isNull(schemaAndVersion)) {
                 return completedFuture(null);
             } else {
                 // The version is only for the compatibility of the current interface
@@ -405,9 +412,20 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                             store.delete(path, Optional.empty())
                                     .thenRun(() -> {
                                         future.complete(version);
-                                    }).exceptionally(ex1 -> {
-                                future.completeExceptionally(ex1);
-                                return null;
+                                    }).exceptionally(zkException -> {
+                                        if (zkException.getCause()
+                                                instanceof MetadataStoreException.NotFoundException) {
+                                            // The znode has been deleted by others.
+                                            // In some cases, the program may enter this logic.
+                                            // Since the znode is gone, we don’t need to deal with it.
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("No node for schema path: {}", path);
+                                            }
+                                            future.complete(null);
+                                        } else {
+                                            future.completeExceptionally(zkException);
+                                        }
+                                        return null;
                             });
                         });
                     }
@@ -681,4 +699,22 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 && rc != BKException.Code.NoSuchEntryException;
         return new SchemaException(recoverable, message);
     }
+
+    public static <T> CompletableFuture<T> ignoreUnrecoverableBKException(CompletableFuture<T> source) {
+        return source.exceptionally(t -> {
+            if (t.getCause() != null
+                    && (t.getCause() instanceof SchemaException)
+                    && !((SchemaException) t.getCause()).isRecoverable()) {
+                // Meeting NoSuchLedgerExistsException or NoSuchEntryException when reading schemas in
+                // bookkeeper. This also means that the data has already been deleted by other operations
+                // in deleting schema.
+                if (log.isDebugEnabled()) {
+                    log.debug("Schema data in bookkeeper may be deleted by other operations.", t);
+                }
+                return null;
+            }
+            // rethrow other cases
+            throw t instanceof CompletionException ? (CompletionException) t : new CompletionException(t);
+        });
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c39c692..8e0f5b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -731,6 +732,32 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         assertFalse(topicHasSchema(topicName));
     }
 
+    @Test
+    public void testConcurrentlyDeleteSchema() throws Exception {
+        String topic = "persistent://prop/ns-delete-schema/concurrently-delete-schema-test";
+        int partitions = 50;
+        admin.namespaces().createNamespace("prop/ns-delete-schema", 3);
+        admin.topics().createPartitionedTopic(topic, partitions);
+
+        Producer producer = pulsarClient
+                .newProducer(Schema.JSON(Schemas.BytesRecord.class))
+                .topic(topic)
+                .create();
+        producer.close();
+
+        CompletableFuture[] asyncFutures = new CompletableFuture[partitions];
+        for (int i = 0; i < partitions; i++) {
+            asyncFutures[i] = getTopic(TopicName.get(topic).getPartition(i).toString()).get().deleteSchema();
+        }
+
+        try {
+            // delete the schema concurrently, and wait for the end of all operations
+            CompletableFuture.allOf(asyncFutures).join();
+        } catch (Exception e) {
+            fail("Should not fail");
+        }
+    }
+
     /**
      * A topic that has retention policy set to non-0, should not be GCed until it has been inactive for at least the
      * retention time.