You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/28 16:41:05 UTC
(pulsar) branch branch-2.10 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new c44cacde66e [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
c44cacde66e is described below
commit c44cacde66e22b9e7a8c539267b0aeca6d6d426e
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Mar 28 23:14:19 2024 +0800
[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
(cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)
# Conflicts:
# pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
---
.../apache/pulsar/broker/service/ServerCnx.java | 5 ++--
.../java/org/apache/pulsar/schema/SchemaTest.java | 35 +++++++++++++++++++++-
2 files changed, 37 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 08d84f3e4fa..aafcb5eeb48 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2126,9 +2126,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
}
+ final String topic = commandGetSchema.getTopic();
String schemaName;
try {
- schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+ schemaName = TopicName.get(topic).getSchemaName();
} catch (Throwable t) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
return;
@@ -2137,7 +2138,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
if (schemaAndMetadata == null) {
commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound,
- String.format("Topic not found or no-schema %s", commandGetSchema.getTopic()));
+ String.format("Topic not found or no-schema %s", topic));
} else {
commandSender.sendGetSchemaResponse(requestId,
SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index c7cbc9b92c5..e6def654fee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
@@ -73,6 +74,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
@@ -101,6 +104,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
+ isTcpLookup = true;
super.internalSetup();
// Setup namespaces
@@ -109,6 +113,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
.allowedClusters(Collections.singleton(CLUSTER_NAME))
.build();
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+ admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
}
@AfterMethod(alwaysRun = true)
@@ -117,6 +122,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @Test
+ public void testGetSchemaWithPatternTopic() throws Exception {
+ final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+ int topicNums = 10;
+ for (int i = 0; i < topicNums; i++) {
+ String topic = topicPrefix + "-" + i;
+ admin.topics().createNonPartitionedTopic(topic);
+ }
+
+ Pattern pattern = Pattern.compile(topicPrefix + "-.*");
+ @Cleanup
+ Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topicsPattern(pattern)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ List<ConsumerImpl<GenericRecord>> consumers =
+ ((MultiTopicsConsumerImpl<GenericRecord>) consumer).getConsumers();
+ Assert.assertEquals(topicNums, consumers.size());
+
+ for (int i = 0; i < topicNums; i++) {
+ String topic = topicPrefix + "-" + i;
+ admin.topics().delete(topic, true);
+ }
+ }
+
@Test
public void testMultiTopicSetSchemaProvider() throws Exception {
final String tenant = PUBLIC_TENANT;
@@ -898,7 +931,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
producer.newMessage(Schema.BYTES).value("test".getBytes(StandardCharsets.UTF_8)).send();
producer.newMessage(Schema.BOOL).value(true).send();
-
+
Schema<Schemas.PersonThree> personThreeSchema = Schema.AVRO(Schemas.PersonThree.class);
byte[] personThreeSchemaBytes = personThreeSchema.getSchemaInfo().getSchema();
org.apache.avro.Schema personThreeSchemaAvroNative = new Parser().parse(new ByteArrayInputStream(personThreeSchemaBytes));