You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/17 12:52:24 UTC
[pulsar] branch master updated: [Schema] Add schemaType field in
SchemaHash (#10573)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed935a7 [Schema] Add schemaType field in SchemaHash (#10573)
ed935a7 is described below
commit ed935a75e9309cf7f52507095153947ccec8f82c
Author: ran <ga...@126.com>
AuthorDate: Mon May 17 20:51:41 2021 +0800
[Schema] Add schemaType field in SchemaHash (#10573)
---
.../service/schema/SchemaRegistryServiceImpl.java | 2 +-
.../apache/pulsar/client/api/InterceptorsTest.java | 11 +++++--
.../java/org/apache/pulsar/schema/SchemaTest.java | 37 ++++++++++++++++++++++
.../pulsar/common/protocol/schema/SchemaHash.java | 18 +++++++----
4 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index f3afd3d..0eff36b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -300,7 +300,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
Schema newSchema = parser.parse(new String(schemaData.getData(), UTF_8));
for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
- if (isUsingAvroSchemaParser(schemaData.getType())) {
+ if (isUsingAvroSchemaParser(schemaAndMetadata.schema.getType())) {
Schema.Parser existParser = new Schema.Parser();
Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index f41191a..7c88654 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -28,9 +28,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,9 +67,13 @@ public class InterceptorsTest extends ProducerConsumerBase {
}
@Test
- public void testProducerInterceptor() throws PulsarClientException {
+ public void testProducerInterceptor() throws Exception {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
+ String ns = "my-property/my-ns" + RandomUtils.nextInt(999, 1999);
+ admin.namespaces().createNamespace(ns, Sets.newHashSet("test"));
+ admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
abstract class BaseInterceptor implements
org.apache.pulsar.client.api.interceptor.ProducerInterceptor {
private static final String set = "set";
@@ -118,7 +125,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic")
+ .topic("persistent://" + ns + "/my-topic")
.intercept(interceptor1, interceptor2, interceptor3)
.create();
MessageId messageId = producer.newMessage().property("STR", "Y")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 744a950..259aaa0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -59,12 +59,14 @@ import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -704,4 +706,39 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
}
}
}
+
+ @Test
+ public void testProducerMultipleSchemaMessages() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicName = "auto_schema_test";
+
+ String ns = tenant + "/" + namespace;
+ admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+ admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String topic = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicName).toString();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic(topic)
+ .create();
+
+ producer.newMessage(Schema.STRING).value("test").send();
+ producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value(new Schemas.PersonThree(0, "ran")).send();
+ producer.newMessage(Schema.AVRO(Schemas.PersonThree.class)).value(new Schemas.PersonThree(0, "ran")).send();
+ producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value(new Schemas.PersonOne(0)).send();
+ producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value(new Schemas.PersonThree(1, "tang")).send();
+ producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
+ producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
+ producer.newMessage(Schema.BOOL).value(true).send();
+
+ List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
+ Assert.assertEquals(allSchemas.size(), 5);
+ Assert.assertEquals(allSchemas.get(0), Schema.STRING.getSchemaInfo());
+ Assert.assertEquals(allSchemas.get(1), Schema.JSON(Schemas.PersonThree.class).getSchemaInfo());
+ Assert.assertEquals(allSchemas.get(2), Schema.AVRO(Schemas.PersonThree.class).getSchemaInfo());
+ Assert.assertEquals(allSchemas.get(3), Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
+ Assert.assertEquals(allSchemas.get(4), Schema.BOOL.getSchemaInfo());
+ }
+
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
index da7657e..57aa3d4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
/**
* Schema hash wrapper with a HashCode inner type.
@@ -36,22 +37,25 @@ public class SchemaHash {
private final HashCode hash;
- private SchemaHash(HashCode hash) {
+ private final SchemaType schemaType;
+
+ private SchemaHash(HashCode hash, SchemaType schemaType) {
this.hash = hash;
+ this.schemaType = schemaType;
}
public static SchemaHash of(Schema schema) {
- return of(Optional.ofNullable(schema)
- .map(Schema::getSchemaInfo)
- .map(SchemaInfo::getSchema).orElse(new byte[0]));
+ Optional<SchemaInfo> schemaInfo = Optional.ofNullable(schema).map(Schema::getSchemaInfo);
+ return of(schemaInfo.map(SchemaInfo::getSchema).orElse(new byte[0]),
+ schemaInfo.map(SchemaInfo::getType).orElse(null));
}
public static SchemaHash of(SchemaData schemaData) {
- return of(schemaData.getData());
+ return of(schemaData.getData(), schemaData.getType());
}
- private static SchemaHash of(byte[] schemaBytes) {
- return new SchemaHash(hashFunction.hashBytes(schemaBytes));
+ private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
+ return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
}
public byte[] asBytes() {