You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/29 09:08:48 UTC
[pulsar] 01/02: Fix schema ledger deletion when deleting topic with
delete schema. (#10383)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 26c2837cb9e374942494e7f32f2d4d8f6df480f6
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Apr 27 23:57:34 2021 +0800
Fix schema ledger deletion when deleting topic with delete schema. (#10383)
* Fix schema ledger deletion when delete topic with delete schema.
* Revert public
* Apply comments.
* Apply comment.
* Fix checkstyle.
* Fix test
(cherry picked from commit a22782490bb9a17411b749326d1f084b096998c8)
---
.../service/schema/BookkeeperSchemaStorage.java | 77 ++++++++++----------
.../java/org/apache/pulsar/schema/SchemaTest.java | 83 +++++++++++++++++++---
2 files changed, 114 insertions(+), 46 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 2ecc927..521db90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -41,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
@@ -83,10 +82,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
private final ServiceConfiguration config;
private BookKeeper bookKeeper;
- // schemaId => ledgers of the schemaId
- private final Map<String, List<Long>> schemaLedgers = new ConcurrentHashMap<>();
-
- private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, CompletableFuture<StoredSchema>> readSchemaOperations =
+ new ConcurrentHashMap<>();
@VisibleForTesting
BookkeeperSchemaStorage(PulsarService pulsar) {
@@ -160,7 +157,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
return result;
}
- private CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
+ CompletableFuture<Optional<LocatorEntry>> getLocator(String key) {
return getSchemaLocator(getSchemaPath(key));
}
@@ -168,8 +165,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
localZkCache.invalidate(getSchemaPath(key));
}
- @VisibleForTesting
- List<Long> getSchemaLedgerList(String key) throws IOException {
+ public List<Long> getSchemaLedgerList(String key) throws IOException {
Optional<LocatorEntry> locatorEntry = null;
try {
locatorEntry = getLocator(key).get();
@@ -390,33 +386,44 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
} else {
// The version is only for the compatibility of the current interface
final long version = -1;
- final List<Long> ledgerIds = schemaLedgers.get(schemaId);
- if (ledgerIds != null) {
- CompletableFuture<Long> future = new CompletableFuture<>();
- final AtomicInteger numOfLedgerIds = new AtomicInteger(ledgerIds.size());
- for (long ledgerId : ledgerIds) {
- bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
- if (rc != BKException.Code.OK) {
- // It's not a serious error, we didn't need call future.completeExceptionally()
- log.warn("Failed to delete ledger {} of {}: {}", ledgerId, schemaId, rc);
- }
- if (numOfLedgerIds.decrementAndGet() == 0) {
- try {
- ZkUtils.deleteFullPathOptimistic(zooKeeper, getSchemaPath(schemaId), -1);
- } catch (InterruptedException | KeeperException e) {
- future.completeExceptionally(e);
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ getLocator(schemaId).whenComplete((locator, ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ } else {
+ if (!locator.isPresent()) {
+ future.complete(null);
+ return;
+ }
+ List<SchemaStorageFormat.IndexEntry> indexEntryList = locator.get().locator.getIndexList();
+ List<CompletableFuture<Void>> deleteFutures = new ArrayList<>(indexEntryList.size());
+ indexEntryList.forEach(indexEntry -> {
+ final long ledgerId = indexEntry.getPosition().getLedgerId();
+ CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+ deleteFutures.add(deleteFuture);
+ bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
+ if (rc != BKException.Code.OK) {
+ // It's not a serious error, we didn't need call future.completeExceptionally()
+ log.warn("Failed to delete ledger {} of {}: {}", ledgerId, schemaId, rc);
}
- clearLocatorCache(getSchemaPath(schemaId));
- future.complete(version);
- }
- }, null);
+ deleteFuture.complete(null);
+ }, null);
+ });
+ FutureUtil.waitForAll(deleteFutures).whenComplete((v, e) -> {
+ final String path = getSchemaPath(schemaId);
+ ZkUtils.asyncDeleteFullPathOptimistic(zooKeeper, path, -1, (rc, path1, ctx) -> {
+ if (rc != Code.OK.intValue()) {
+ future.completeExceptionally(KeeperException.create(Code.get(rc)));
+ } else {
+ clearLocatorCache(getSchemaPath(schemaId));
+ future.complete(version);
+ }
+ }, path);
+
+ });
}
- return future;
- } else {
- // It should never reach here
- log.warn("No ledgers for schema id: {}", schemaId);
- return completedFuture(version);
- }
+ });
+ return future;
}
});
}
@@ -578,10 +585,6 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
} else {
- schemaLedgers.computeIfAbsent(
- schemaId,
- key -> Collections.synchronizedList(new ArrayList<>())
- ).add(handle.getId());
future.complete(handle);
}
}, null, metadata);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 595da2b..d72cbf1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -20,13 +20,23 @@ package org.apache.pulsar.schema;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Sets;
import java.util.Collections;
-
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -39,14 +49,11 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
-
-import lombok.extern.slf4j.Slf4j;
-
@Slf4j
public class SchemaTest extends MockedPulsarServiceBaseTest {
@@ -206,4 +213,62 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
}
}
}
+
+ @Test
+ public void testDeleteTopicAndSchema() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicName = "test-delete-topic-and-schema";
+
+ final String topic = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topicName).toString();
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME));
+
+ @Cleanup
+ Producer<Schemas.PersonOne> p1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ Producer<Schemas.PersonThree> p2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+ .topic(topic)
+ .create();
+
+ List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures =
+ this.pulsar.getSchemaRegistryService().getAllSchemas(TopicName.get(topic).getSchemaName()).get();
+ FutureUtil.waitForAll(schemaFutures).get();
+ List<SchemaRegistry.SchemaAndMetadata> schemas = schemaFutures.stream().map(future -> {
+ try {
+ return future.get();
+ } catch (Exception e) {
+ return null;
+ }
+ }).collect(Collectors.toList());
+
+ assertEquals(schemas.size(), 2);
+ for (SchemaRegistry.SchemaAndMetadata schema : schemas) {
+ assertNotNull(schema);
+ }
+
+ List<Long> ledgers = ((BookkeeperSchemaStorage)this.pulsar.getSchemaStorage())
+ .getSchemaLedgerList(TopicName.get(topic).getSchemaName());
+ assertEquals(ledgers.size(), 2);
+ admin.topics().delete(topic, true, true);
+ assertEquals(this.pulsar.getSchemaRegistryService()
+ .trimDeletedSchemaAndGetList(TopicName.get(topic).getSchemaName()).get().size(), 0);
+
+ for (Long ledger : ledgers) {
+ try {
+ pulsar.getBookKeeperClient().openLedger(ledger, BookKeeper.DigestType.CRC32, new byte[]{});
+ fail();
+ } catch (BKException.BKNoSuchLedgerExistsException ignore) {
+ }
+ }
+ }
}