You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:14 UTC
[pulsar] 28/38: Fix Get schema by version can get the deleted
schema info #6754 (#6764)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cadc852b36c18d6496a8c6ca82eda6b63087ef80
Author: feynmanlin <31...@qq.com>
AuthorDate: Thu Apr 30 21:27:38 2020 +0800
Fix Get schema by version can get the deleted schema info #6754 (#6764)
(cherry picked from commit 466b0b89af61a0d9c7aa7e6220c91f366f2d3831)
---
.../service/schema/BookkeeperSchemaStorage.java | 13 +++----
.../service/schema/SchemaRegistryServiceImpl.java | 37 ++++++++++++++-----
.../broker/service/PersistentTopicE2ETest.java | 42 ++++++++++++++++++++++
.../broker/service/schema/SchemaServiceTest.java | 4 +--
4 files changed, 80 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index df41e71..b013c8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -237,10 +237,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
return findSchemaEntryByVersion(schemaLocator.getIndexList(), version)
.thenApply(entry ->
- new StoredSchema(
- entry.getSchemaData().toByteArray(),
- new LongSchemaVersion(version)
- )
+ new StoredSchema(
+ entry.getSchemaData().toByteArray(),
+ new LongSchemaVersion(version)
+ )
);
});
}
@@ -387,11 +387,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
.setPosition(position)
.setHash(copyFrom(hash))
.build();
+
return updateSchemaLocator(getSchemaPath(schemaId),
SchemaStorageFormat.SchemaLocator.newBuilder()
.setInfo(info)
.addAllIndex(
- concat(locator.getIndexList(), newArrayList(info))
+ concat(locator.getIndexList(), newArrayList(info))
).build(), locatorEntry.zkZnodeVersion
).thenApply(ignore -> nextVersion);
}
@@ -409,7 +410,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
SchemaStorageFormat.IndexEntry lowest = index.get(0);
if (version < lowest.getVersion()) {
return readSchemaEntry(lowest.getPosition())
- .thenCompose(entry -> findSchemaEntryByVersion(entry.getIndexList(), version));
+ .thenCompose(entry -> findSchemaEntryByVersion(entry.getIndexList(), version));
}
for (SchemaStorageFormat.IndexEntry entry : index) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 7307e4c..30d5d77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -44,12 +44,14 @@ import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
+import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
@@ -87,15 +89,34 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
@Override
@NotNull
public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version) {
- return schemaStorage.get(schemaId, version).thenCompose(stored -> {
- if (isNull(stored)) {
- return completedFuture(null);
- } else {
- return Functions.bytesToSchemaInfo(stored.data)
- .thenApply(Functions::schemaInfoToSchema)
- .thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version));
+ CompletableFuture<StoredSchema> completableFuture;
+ if (version == SchemaVersion.Latest) {
+ completableFuture = schemaStorage.get(schemaId, version);
+ } else {
+ long longVersion = ((LongSchemaVersion) version).getVersion();
+ //If the schema has been deleted, it cannot be obtained
+ completableFuture = trimDeletedSchemaAndGetList(schemaId)
+ .thenApply(metadataList -> metadataList.stream().filter(schemaAndMetadata ->
+ ((LongSchemaVersion) schemaAndMetadata.version).getVersion() == longVersion)
+ .collect(Collectors.toList())
+ ).thenCompose(metadataList -> {
+ if (CollectionUtils.isNotEmpty(metadataList)) {
+ return schemaStorage.get(schemaId, version);
+ }
+ return completedFuture(null);
+ }
+ );
+ }
+
+ return completableFuture.thenCompose(stored -> {
+ if (isNull(stored)) {
+ return completedFuture(null);
+ } else {
+ return Functions.bytesToSchemaInfo(stored.data)
+ .thenApply(Functions::schemaInfoToSchema)
+ .thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version));
+ }
}
- }
);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 42190c7..38e85d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -65,8 +68,10 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -682,6 +687,43 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
assertFalse(topicHasSchema(topicName));
}
+ @Test
+ public void testDeleteSchema() throws Exception {
+ PulsarClientImpl httpProtocolClient = (PulsarClientImpl) PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
+ PulsarClientImpl binaryProtocolClient = (PulsarClientImpl) pulsarClient;
+ LookupService binaryLookupService = binaryProtocolClient.getLookup();
+ LookupService httpLookupService = httpProtocolClient.getLookup();
+
+ String topicName = "persistent://prop/ns-abc/topic-1";
+ //Topic is not GCed with live connection
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+
+ Optional<Topic> topic = getTopic(topicName);
+ assertTrue(topic.isPresent());
+
+ byte[] data = JSONSchema.of(SchemaDefinition.builder()
+ .withPojo(Foo.class).build()).getSchemaInfo().getSchema();
+ SchemaData schemaData = SchemaData.builder()
+ .data(data)
+ .type(SchemaType.BYTES)
+ .user("foo").build();
+
+ topic.get().addSchema(schemaData).join();
+ assertTrue(topicHasSchema(topicName));
+
+ Assert.assertEquals(admin.schemas().getAllSchemas(topicName).size(), 1);
+ assertTrue(httpLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+ assertTrue(binaryLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+
+ topic.get().deleteSchema().join();
+ Assert.assertEquals(admin.schemas().getAllSchemas(topicName).size(), 0);
+ assertFalse(httpLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+ assertFalse(binaryLookupService.getSchema(TopicName.get(topicName), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent());
+
+ assertFalse(topicHasSchema(topicName));
+ }
+
/**
* A topic that has retention policy set to non-0, should not be GCed until it has been inactive for at least the
* retention time.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index ab7e910..00b09cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -226,8 +226,8 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
deleteSchema(schemaId1, version(7));
- SchemaData version7 = getSchema(schemaId1, version(7));
- assertTrue(version7.isDeleted());
+ SchemaRegistry.SchemaAndMetadata version7 = schemaRegistryService.getSchema(schemaId1, version(7)).get();
+ assertNull(version7);
}