You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/27 08:06:37 UTC

[pulsar] 02/03: Fix primitive schema upload for ALWAYS_COMPATIBLE strategy. (#10386)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 592c3f8040a10826ec5488a6836209cddaddd263
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Apr 27 06:56:36 2021 +0800

    Fix primitive schema upload for ALWAYS_COMPATIBLE strategy. (#10386)
    
    * Fix primary schema upload for ALWAYS_COMPATIBLE strategy.
    
    * Fix checkstyle.
    
    (cherry picked from commit f8716c258f095f8cb0ec44ea7e6b56cb431224f3)
---
 .../service/schema/BookkeeperSchemaStorage.java    |  7 +------
 .../SchemaTypeCompatibilityCheckTest.java          | 23 ++++++++++++++++++++--
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 0911ac6..2ecc927 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -294,12 +293,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(optLocatorEntry -> {
 
             if (optLocatorEntry.isPresent()) {
-                // Schema locator was already present
+
                 SchemaStorageFormat.SchemaLocator locator = optLocatorEntry.get().locator;
-                byte[] storedHash = locator.getInfo().getHash().toByteArray();
-                if (storedHash.length > 0 && Arrays.equals(storedHash, hash)) {
-                    return completedFuture(locator.getInfo().getVersion());
-                }
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, hash);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index bc711c5..61cc989 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.schema.compatibility;
 
 import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -313,17 +315,34 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
         };
 
         for (Schema<?> schema : schemas) {
-            pulsarClient.newProducer(schema)
+            Producer<?> p = pulsarClient.newProducer(schema)
                     .topic(topicName)
                     .create();
+            p.close();
         }
 
         for (Schema<?> schema : schemas) {
-            pulsarClient.newConsumer(schema)
+            Consumer<?> c = pulsarClient.newConsumer(schema)
                     .topic(topicName)
                     .subscriptionName(UUID.randomUUID().toString())
                     .subscribe();
+            c.close();
         }
+
+        List<SchemaInfo> schemasOfTopic = admin.schemas().getAllSchemas(topicName);
+
+        // bytes[] schema and bytebuffer schema does not upload schema info to the schema registry
+        assertEquals(schemasOfTopic.size(), schemas.length - 2);
+
+        // Try to upload the schema again.
+        for (Schema<?> schema : schemas) {
+            Producer<?> p = pulsarClient.newProducer(schema)
+                    .topic(topicName)
+                    .create();
+            p.close();
+        }
+
+        assertEquals(schemasOfTopic.size(), schemas.length - 2);
     }
 
 }