You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/30 08:32:29 UTC

(pulsar) branch master updated: [fix][admin] Fix KeyValue schema compatibility check caused OOM (#21645)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e27070afffb [fix][admin] Fix KeyValue schema compatibility check caused OOM (#21645)
e27070afffb is described below

commit e27070afffbda7ebe22d8ba001bb26123e5e3674
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Nov 30 16:32:21 2023 +0800

    [fix][admin] Fix KeyValue schema compatibility check caused OOM (#21645)
---
 .../org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java  | 7 ++++++-
 .../apache/pulsar/broker/service/schema/SchemaServiceTest.java    | 8 ++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 1992ea7e477..454b8f0fac8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -147,8 +147,13 @@ public class SchemasResourceBase extends AdminResource {
                 .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
                 .thenCompose(strategy -> {
                     String schemaId = getSchemaId();
+                    final SchemaType schemaType = SchemaType.valueOf(payload.getType());
+                    byte[] data = payload.getSchema().getBytes(StandardCharsets.UTF_8);
+                    if (schemaType.getValue() == SchemaType.KEY_VALUE.getValue()) {
+                        data = SchemaUtils.convertKeyValueDataStringToSchemaInfoSchema(data);
+                    }
                     return pulsar().getSchemaRegistryService().isCompatible(schemaId,
-                            SchemaData.builder().data(payload.getSchema().getBytes(StandardCharsets.UTF_8))
+                            SchemaData.builder().data(data)
                                     .isDeleted(false)
                                     .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
                                     .user(defaultIfEmpty(clientAppId(), ""))
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 2bdb24dceeb..fbf734f331f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertFalse;
@@ -46,9 +47,11 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -403,8 +406,12 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
                         .build(),
                 SchemaInfo.builder().type(SchemaType.BOOLEAN).schema(new byte[0])
                         .build(), KeyValueEncodingType.SEPARATED);
+        assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.schemas().testCompatibility(topicName, schemaInfo));
         admin.schemas().createSchema(topicName, schemaInfo);
 
+        final IsCompatibilityResponse isCompatibilityResponse = admin.schemas().testCompatibility(topicName, schemaInfo);
+        Assert.assertTrue(isCompatibilityResponse.isCompatibility());
+
         final SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName);
         Assert.assertEquals(schemaInfoWithVersion.getVersion(), 0);
 
@@ -413,5 +420,6 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
         final Long version2 = admin.schemas().getVersionBySchema(topicName, schemaInfoWithVersion.getSchemaInfo());
         Assert.assertEquals(version2, 0);
+
     }
 }