You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/17 12:52:24 UTC

[pulsar] branch master updated: [Schema] Add schemaType field in SchemaHash (#10573)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed935a7  [Schema] Add schemaType field in SchemaHash (#10573)
ed935a7 is described below

commit ed935a75e9309cf7f52507095153947ccec8f82c
Author: ran <ga...@126.com>
AuthorDate: Mon May 17 20:51:41 2021 +0800

    [Schema] Add schemaType field in SchemaHash (#10573)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  2 +-
 .../apache/pulsar/client/api/InterceptorsTest.java | 11 +++++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 37 ++++++++++++++++++++++
 .../pulsar/common/protocol/schema/SchemaHash.java  | 18 +++++++----
 4 files changed, 58 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index f3afd3d..0eff36b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -300,7 +300,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService {
             Schema newSchema = parser.parse(new String(schemaData.getData(), UTF_8));
 
             for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
-                if (isUsingAvroSchemaParser(schemaData.getType())) {
+                if (isUsingAvroSchemaParser(schemaAndMetadata.schema.getType())) {
                     Schema.Parser existParser = new Schema.Parser();
                     Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
                     if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index f41191a..7c88654 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -28,9 +28,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,9 +67,13 @@ public class InterceptorsTest extends ProducerConsumerBase {
     }
 
     @Test
-    public void testProducerInterceptor() throws PulsarClientException {
+    public void testProducerInterceptor() throws Exception {
         Map<MessageId, List<String>> ackCallback = new HashMap<>();
 
+        String ns = "my-property/my-ns" + RandomUtils.nextInt(999, 1999);
+        admin.namespaces().createNamespace(ns, Sets.newHashSet("test"));
+        admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
         abstract class BaseInterceptor implements
                 org.apache.pulsar.client.api.interceptor.ProducerInterceptor {
             private static final String set = "set";
@@ -118,7 +125,7 @@ public class InterceptorsTest extends ProducerConsumerBase {
         };
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
-                .topic("persistent://my-property/my-ns/my-topic")
+                .topic("persistent://" + ns + "/my-topic")
                 .intercept(interceptor1, interceptor2, interceptor3)
                 .create();
         MessageId messageId = producer.newMessage().property("STR", "Y")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 744a950..259aaa0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -59,12 +59,14 @@ import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -704,4 +706,39 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test
+    public void testProducerMultipleSchemaMessages() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicName = "auto_schema_test";
+
+        String ns = tenant + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String topic = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicName).toString();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+                .topic(topic)
+                .create();
+
+        producer.newMessage(Schema.STRING).value("test").send();
+        producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value(new Schemas.PersonThree(0, "ran")).send();
+        producer.newMessage(Schema.AVRO(Schemas.PersonThree.class)).value(new Schemas.PersonThree(0, "ran")).send();
+        producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value(new Schemas.PersonOne(0)).send();
+        producer.newMessage(Schema.JSON(Schemas.PersonThree.class)).value(new Schemas.PersonThree(1, "tang")).send();
+        producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
+        producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
+        producer.newMessage(Schema.BOOL).value(true).send();
+
+        List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
+        Assert.assertEquals(allSchemas.size(), 5);
+        Assert.assertEquals(allSchemas.get(0), Schema.STRING.getSchemaInfo());
+        Assert.assertEquals(allSchemas.get(1), Schema.JSON(Schemas.PersonThree.class).getSchemaInfo());
+        Assert.assertEquals(allSchemas.get(2), Schema.AVRO(Schemas.PersonThree.class).getSchemaInfo());
+        Assert.assertEquals(allSchemas.get(3), Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
+        Assert.assertEquals(allSchemas.get(4), Schema.BOOL.getSchemaInfo());
+    }
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
index da7657e..57aa3d4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import lombok.EqualsAndHashCode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * Schema hash wrapper with a HashCode inner type.
@@ -36,22 +37,25 @@ public class SchemaHash {
 
     private final HashCode hash;
 
-    private SchemaHash(HashCode hash) {
+    private final SchemaType schemaType;
+
+    private SchemaHash(HashCode hash, SchemaType schemaType) {
         this.hash = hash;
+        this.schemaType = schemaType;
     }
 
     public static SchemaHash of(Schema schema) {
-        return of(Optional.ofNullable(schema)
-                          .map(Schema::getSchemaInfo)
-                          .map(SchemaInfo::getSchema).orElse(new byte[0]));
+        Optional<SchemaInfo> schemaInfo = Optional.ofNullable(schema).map(Schema::getSchemaInfo);
+        return of(schemaInfo.map(SchemaInfo::getSchema).orElse(new byte[0]),
+                schemaInfo.map(SchemaInfo::getType).orElse(null));
     }
 
     public static SchemaHash of(SchemaData schemaData) {
-        return of(schemaData.getData());
+        return of(schemaData.getData(), schemaData.getType());
     }
 
-    private static SchemaHash of(byte[] schemaBytes) {
-        return new SchemaHash(hashFunction.hashBytes(schemaBytes));
+    private static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
+        return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
     }
 
     public byte[] asBytes() {