You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/29 06:57:00 UTC

(pulsar) branch master updated: [fix][broker] Fixed getting incorrect KeyValue schema version (#21632)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 81a9a527b35 [fix][broker] Fixed getting incorrect KeyValue schema version (#21632)
81a9a527b35 is described below

commit 81a9a527b353c07db0743f970edb009a929888fe
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Nov 29 14:56:54 2023 +0800

    [fix][broker] Fixed getting incorrect KeyValue schema version (#21632)
---
 .../broker/admin/impl/SchemasResourceBase.java     |  9 +++++--
 .../broker/service/schema/SchemaServiceTest.java   | 29 ++++++++++++++++++++++
 .../pulsar/client/impl/HttpLookupService.java      | 21 ++++++----------
 .../pulsar/client/impl/schema/SchemaUtils.java     |  3 +--
 4 files changed, 45 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 0bab772044a..1992ea7e477 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -161,10 +162,14 @@ public class SchemasResourceBase extends AdminResource {
         return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
                 .thenCompose(__ -> {
                     String schemaId = getSchemaId();
+                    final SchemaType schemaType = SchemaType.valueOf(payload.getType());
+                    byte[] data = payload.getSchema().getBytes(StandardCharsets.UTF_8);
+                    if (schemaType.getValue() == SchemaType.KEY_VALUE.getValue()) {
+                        data = SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(data);
+                    }
                     return pulsar().getSchemaRegistryService()
                             .findSchemaVersion(schemaId,
-                                    SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
-                                            .isDeleted(false).timestamp(clock.millis())
+                                    SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
                                             .type(SchemaType.valueOf(payload.getType()))
                                             .user(defaultIfEmpty(clientAppId(), ""))
                                             .props(payload.getProperties()).build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index c7e30d5c3fc..2bdb24dceeb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -40,16 +40,21 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -93,6 +98,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
         Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
         checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
         schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock, null);
+        setupDefaultTenantAndNamespace();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -385,4 +391,27 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
     private SchemaVersion version(long version) {
         return new LongSchemaVersion(version);
     }
+
+    @Test
+    public void testKeyValueSchema() throws Exception {
+        final String topicName = "persistent://public/default/testKeyValueSchema";
+        admin.topics().createNonPartitionedTopic(BrokerTestUtil.newUniqueName(topicName));
+
+        final SchemaInfo schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+                "keyValue",
+                SchemaInfo.builder().type(SchemaType.STRING).schema(new byte[0])
+                        .build(),
+                SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0])
+                        .build(), KeyValueEncodingType.SEPARATED);
+        admin.schemas().createSchema(topicName, schemaInfo);
+
+        final SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName);
+        Assert.assertEquals(schemaInfoWithVersion.getVersion(), 0);
+
+        final Long version1 = admin.schemas().getVersionBySchema(topicName, schemaInfo);
+        Assert.assertEquals(version1, 0);
+
+        final Long version2 = admin.schemas().getVersionBySchema(topicName, schemaInfoWithVersion.getSchemaInfo());
+        Assert.assertEquals(version2, 0);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 7969ce40236..e33efabcc9e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -178,18 +177,14 @@ public class HttpLookupService implements LookupService {
         }
         httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
             if (response.getType() == SchemaType.KEY_VALUE) {
-                try {
-                    SchemaData data = SchemaData
-                            .builder()
-                            .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
-                                    response.getData().getBytes(StandardCharsets.UTF_8)))
-                            .type(response.getType())
-                            .props(response.getProperties())
-                            .build();
-                    future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
-                } catch (IOException err) {
-                    future.completeExceptionally(err);
-                }
+                SchemaData data = SchemaData
+                        .builder()
+                        .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
+                                response.getData().getBytes(StandardCharsets.UTF_8)))
+                        .type(response.getType())
+                        .props(response.getProperties())
+                        .build();
+                future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
             } else {
                 future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index 8acbf26559b..881ad424669 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -359,8 +359,7 @@ public final class SchemaUtils {
      * @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes
      * @return the key/value schema info data bytes
      */
-    public static byte[] convertKeyValueDataStringToSchemaInfoSchema(
-            byte[] keyValueSchemaInfoDataJsonBytes) throws IOException {
+    public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) {
         JsonObject jsonObject = (JsonObject) toJsonElement(new String(keyValueSchemaInfoDataJsonBytes, UTF_8));
         byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key"));
         byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value"));