You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/28 16:41:05 UTC

(pulsar) branch branch-2.10 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new c44cacde66e [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
c44cacde66e is described below

commit c44cacde66e22b9e7a8c539267b0aeca6d6d426e
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

    [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
    
    (cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)
    
    # Conflicts:
    #       pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 35 +++++++++++++++++++++-
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 08d84f3e4fa..aafcb5eeb48 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2126,9 +2126,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
         }
 
+        final String topic = commandGetSchema.getTopic();
         String schemaName;
         try {
-            schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+            schemaName = TopicName.get(topic).getSchemaName();
         } catch (Throwable t) {
             commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
             return;
@@ -2137,7 +2138,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
             if (schemaAndMetadata == null) {
                 commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
-                        String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
+                        String.format("Topic not found or no-schema %s", topic));
             } else {
                 commandSender.sendGetSchemaResponse(requestId,
                         SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index c7cbc9b92c5..e6def654fee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
@@ -73,6 +74,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
@@ -101,6 +104,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     public void setup() throws Exception {
+        isTcpLookup = true;
         super.internalSetup();
 
         // Setup namespaces
@@ -109,6 +113,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
                 .allowedClusters(Collections.singleton(CLUSTER_NAME))
                 .build();
         admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+        admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
     }
 
     @AfterMethod(alwaysRun = true)
@@ -117,6 +122,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @Test
+    public void testGetSchemaWithPatternTopic() throws Exception {
+        final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+        int topicNums = 10;
+        for (int i = 0; i < topicNums; i++) {
+            String topic = topicPrefix + "-" + i;
+            admin.topics().createNonPartitionedTopic(topic);
+        }
+
+        Pattern pattern = Pattern.compile(topicPrefix + "-.*");
+        @Cleanup
+        Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topicsPattern(pattern)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        List<ConsumerImpl<GenericRecord>> consumers =
+                ((MultiTopicsConsumerImpl<GenericRecord>) consumer).getConsumers();
+        Assert.assertEquals(topicNums, consumers.size());
+
+        for (int i = 0; i < topicNums; i++) {
+            String topic = topicPrefix + "-" + i;
+            admin.topics().delete(topic, true);
+        }
+    }
+
     @Test
     public void testMultiTopicSetSchemaProvider() throws Exception {
         final String tenant = PUBLIC_TENANT;
@@ -898,7 +931,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
         producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
         producer.newMessage(Schema.BOOL).value(true).send();
-        
+
         Schema<Schemas.PersonThree> personThreeSchema = Schema.AVRO(Schemas.PersonThree.class);
         byte[] personThreeSchemaBytes = personThreeSchema.getSchemaInfo().getSchema();
         org.apache.avro.Schema personThreeSchemaAvroNative = new Parser().parse(new ByteArrayInputStream(personThreeSchemaBytes));