You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/29 06:57:00 UTC
(pulsar) branch master updated: [fix][broker] Fixed getting incorrect KeyValue schema version (#21632)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 81a9a527b35 [fix][broker] Fixed getting incorrect KeyValue schema version (#21632)
81a9a527b35 is described below
commit 81a9a527b353c07db0743f970edb009a929888fe
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Nov 29 14:56:54 2023 +0800
[fix][broker] Fixed getting incorrect KeyValue schema version (#21632)
---
.../broker/admin/impl/SchemasResourceBase.java | 9 +++++--
.../broker/service/schema/SchemaServiceTest.java | 29 ++++++++++++++++++++++
.../pulsar/client/impl/HttpLookupService.java | 21 ++++++----------
.../pulsar/client/impl/schema/SchemaUtils.java | 3 +--
4 files changed, 45 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 0bab772044a..1992ea7e477 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -161,10 +162,14 @@ public class SchemasResourceBase extends AdminResource {
return validateOwnershipAndOperationAsync(authoritative, TopicOperation.GET_METADATA)
.thenCompose(__ -> {
String schemaId = getSchemaId();
+ final SchemaType schemaType = SchemaType.valueOf(payload.getType());
+ byte[] data = payload.getSchema().getBytes(StandardCharsets.UTF_8);
+ if (schemaType.getValue() == SchemaType.KEY_VALUE.getValue()) {
+ data = SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(data);
+ }
return pulsar().getSchemaRegistryService()
.findSchemaVersion(schemaId,
- SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
- .isDeleted(false).timestamp(clock.millis())
+ SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
.type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), ""))
.props(payload.getProperties()).build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index c7e30d5c3fc..2bdb24dceeb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -40,16 +40,21 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -93,6 +98,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
schemaRegistryService = new SchemaRegistryServiceImpl(storage, checkMap, MockClock, null);
+ setupDefaultTenantAndNamespace();
}
@AfterMethod(alwaysRun = true)
@@ -385,4 +391,27 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
private SchemaVersion version(long version) {
return new LongSchemaVersion(version);
}
+
+ @Test
+ public void testKeyValueSchema() throws Exception {
+ final String topicName = "persistent://public/default/testKeyValueSchema";
+ admin.topics().createNonPartitionedTopic(BrokerTestUtil.newUniqueName(topicName));
+
+ final SchemaInfo schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+ "keyValue",
+ SchemaInfo.builder().type(SchemaType.STRING).schema(new byte[0])
+ .build(),
+ SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0])
+ .build(), KeyValueEncodingType.SEPARATED);
+ admin.schemas().createSchema(topicName, schemaInfo);
+
+ final SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName);
+ Assert.assertEquals(schemaInfoWithVersion.getVersion(), 0);
+
+ final Long version1 = admin.schemas().getVersionBySchema(topicName, schemaInfo);
+ Assert.assertEquals(version1, 0);
+
+ final Long version2 = admin.schemas().getVersionBySchema(topicName, schemaInfoWithVersion.getSchemaInfo());
+ Assert.assertEquals(version2, 0);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 7969ce40236..e33efabcc9e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;
import io.netty.channel.EventLoopGroup;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -178,18 +177,14 @@ public class HttpLookupService implements LookupService {
}
httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> {
if (response.getType() == SchemaType.KEY_VALUE) {
- try {
- SchemaData data = SchemaData
- .builder()
- .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
- response.getData().getBytes(StandardCharsets.UTF_8)))
- .type(response.getType())
- .props(response.getProperties())
- .build();
- future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
- } catch (IOException err) {
- future.completeExceptionally(err);
- }
+ SchemaData data = SchemaData
+ .builder()
+ .data(SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(
+ response.getData().getBytes(StandardCharsets.UTF_8)))
+ .type(response.getType())
+ .props(response.getProperties())
+ .build();
+ future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, data)));
} else {
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index 8acbf26559b..881ad424669 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -359,8 +359,7 @@ public final class SchemaUtils {
* @param keyValueSchemaInfoDataJsonBytes the key/value schema info data json bytes
* @return the key/value schema info data bytes
*/
- public static byte[] convertKeyValueDataStringToSchemaInfoSchema(
- byte[] keyValueSchemaInfoDataJsonBytes) throws IOException {
+ public static byte[] convertKeyValueDataStringToSchemaInfoSchema(byte[] keyValueSchemaInfoDataJsonBytes) {
JsonObject jsonObject = (JsonObject) toJsonElement(new String(keyValueSchemaInfoDataJsonBytes, UTF_8));
byte[] keyBytes = getKeyOrValueSchemaBytes(jsonObject.get("key"));
byte[] valueBytes = getKeyOrValueSchemaBytes(jsonObject.get("value"));