You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:14 UTC

[pulsar] 28/38: Fix Get schema by version can get the deleted schema info #6754 (#6764)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cadc852b36c18d6496a8c6ca82eda6b63087ef80
Author: feynmanlin <31...@qq.com>
AuthorDate: Thu Apr 30 21:27:38 2020 +0800

    Fix Get schema by version can get the deleted schema info #6754 (#6764)
    
    (cherry picked from commit 466b0b89af61a0d9c7aa7e6220c91f366f2d3831)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 13 +++----
 .../service/schema/SchemaRegistryServiceImpl.java  | 37 ++++++++++++++-----
 .../broker/service/PersistentTopicE2ETest.java     | 42 ++++++++++++++++++++++
 .../broker/service/schema/SchemaServiceTest.java   |  4 +--
 4 files changed, 80 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index df41e71..b013c8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -237,10 +237,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
 
             return findSchemaEntryByVersion(schemaLocator.getIndexList(), version)
                 .thenApply(entry ->
-                    new StoredSchema(
-                        entry.getSchemaData().toByteArray(),
-                        new LongSchemaVersion(version)
-                    )
+                        new StoredSchema(
+                            entry.getSchemaData().toByteArray(),
+                            new LongSchemaVersion(version)
+                        )
                 );
         });
     }
@@ -387,11 +387,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 .setPosition(position)
                 .setHash(copyFrom(hash))
                 .build();
+
         return updateSchemaLocator(getSchemaPath(schemaId),
             SchemaStorageFormat.SchemaLocator.newBuilder()
                 .setInfo(info)
                 .addAllIndex(
-                    concat(locator.getIndexList(), newArrayList(info))
+                        concat(locator.getIndexList(), newArrayList(info))
                 ).build(), locatorEntry.zkZnodeVersion
         ).thenApply(ignore -> nextVersion);
     }
@@ -409,7 +410,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         SchemaStorageFormat.IndexEntry lowest = index.get(0);
         if (version < lowest.getVersion()) {
             return readSchemaEntry(lowest.getPosition())
-                .thenCompose(entry -> findSchemaEntryByVersion(entry.getIndexList(), version));
+                    .thenCompose(entry -> findSchemaEntryByVersion(entry.getIndexList(), version));
         }
 
         for (SchemaStorageFormat.IndexEntry entry : index) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 7307e4c..30d5d77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -44,12 +44,14 @@ import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaHash;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
+import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -87,15 +89,34 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
     @Override
     @NotNull
     public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version) {
-        return schemaStorage.get(schemaId, version).thenCompose(stored -> {
-                if (isNull(stored)) {
-                    return completedFuture(null);
-                } else {
-                    return Functions.bytesToSchemaInfo(stored.data)
-                        .thenApply(Functions::schemaInfoToSchema)
-                        .thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version));
+        CompletableFuture<StoredSchema> completableFuture;
+        if (version == SchemaVersion.Latest) {
+            completableFuture = schemaStorage.get(schemaId, version);
+        } else {
+            long longVersion = ((LongSchemaVersion) version).getVersion();
+            //If the schema has been deleted, it cannot be obtained
+            completableFuture = trimDeletedSchemaAndGetList(schemaId)
+                .thenApply(metadataList -> metadataList.stream().filter(schemaAndMetadata ->
+                        ((LongSchemaVersion) schemaAndMetadata.version).getVersion() == longVersion)
+                        .collect(Collectors.toList())
+                ).thenCompose(metadataList -> {
+                        if (CollectionUtils.isNotEmpty(metadataList)) {
+                            return schemaStorage.get(schemaId, version);
+                        }
+                        return completedFuture(null);
+                    }
+                );
+        }
+
+        return completableFuture.thenCompose(stored -> {
+                    if (isNull(stored)) {
+                        return completedFuture(null);
+                    } else {
+                        return Functions.bytesToSchemaInfo(stored.data)
+                                .thenApply(Functions::schemaInfoToSchema)
+                                .thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version));
+                    }
                 }
-            }
         );
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 42190c7..38e85d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -65,8 +68,10 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -682,6 +687,43 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         assertFalse(topicHasSchema(topicName));
     }
 
+    @Test
+    public void testDeleteSchema() throws Exception {
+        PulsarClientImpl httpProtocolClient = (PulsarClientImpl) PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
+        PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient;
+        LookupService binaryLookupService = binaryProtocolClient.getLookup();
+        LookupService httpLookupService = httpProtocolClient.getLookup();
+
+        String topicName = "persistent://prop/ns-abc/topic-1";
+        //Topic is not GCed with live connection
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+
+        Optional<Topic> topic = getTopic(topicName);
+        assertTrue(topic.isPresent());
+
+        byte[] data = JSONSchema.of(SchemaDefinition.builder()
+                .withPojo(Foo.class).build()).getSchemaInfo().getSchema();
+        SchemaData schemaData = SchemaData.builder()
+                .data(data)
+                .type(SchemaType.BYTES)
+                .user("foo").build();
+
+        topic.get().addSchema(schemaData).join();
+        assertTrue(topicHasSchema(topicName));
+
+        Assert.assertEquals(admin.schemas().getAllSchemas(topicName).size(), 1);
+        assertTrue(httpLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+        assertTrue(binaryLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+
+        topic.get().deleteSchema().join();
+        Assert.assertEquals(admin.schemas().getAllSchemas(topicName).size(), 0);
+        assertFalse(httpLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+        assertFalse(binaryLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+
+        assertFalse(topicHasSchema(topicName));
+    }
+
     /**
      * A topic that has retention policy set to non-0, should not be GCed until it has been inactive for at least the
      * retention time.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index ab7e910..00b09cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -226,8 +226,8 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
         deleteSchema(schemaId1, version(7));
 
-        SchemaData version7 = getSchema(schemaId1, version(7));
-        assertTrue(version7.isDeleted());
+        SchemaRegistry.SchemaAndMetadata version7 = schemaRegistryService.getSchema(schemaId1, version(7)).get();
+        assertNull(version7);
 
     }