You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/18 07:59:24 UTC

[pulsar] 02/16: Fix inconsistent prompt message when schema version is empty using AVRO. (#14626)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aafba1ac18dc729b10de62eca0e2df91025e5bce
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Mar 12 11:11:33 2022 +0800

    Fix inconsistent prompt message when schema version is empty using AVRO. (#14626)
    
    (cherry picked from commit 190e5dbccda455a84ea7fdf491dac52cc50fbbdf)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 76 +++++++++++++++++++++-
 .../client/impl/BinaryProtoLookupService.java      |  8 ++-
 .../pulsar/client/impl/HttpLookupService.java      |  5 ++
 4 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2ba51d2..c18d525 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1923,6 +1923,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         long requestId = commandGetSchema.getRequestId();
         SchemaVersion schemaVersion = SchemaVersion.Latest;
         if (commandGetSchema.hasSchemaVersion()) {
+            if (commandGetSchema.getSchemaVersion().length == 0) {
+                commandSender.sendGetSchemaErrorResponse(requestId, ServerError.IncompatibleSchema,
+                        "Empty schema version");
+                return;
+            }
             schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index ab78988..c45888f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -29,12 +29,13 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
+import com.google.common.base.Throwables;
+import lombok.EqualsAndHashCode;
 import org.apache.avro.Schema.Parser;
-
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.Sets;
-
 import java.io.ByteArrayInputStream;
+import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,7 +45,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -59,7 +59,9 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -1064,4 +1066,72 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testAvroSchemaWithHttpLookup() throws Exception {
+        stopBroker();
+        isTcpLookup = false;
+        setup();
+        testEmptySchema();
+    }
+
+    @Test
+    public void testAvroSchemaWithTcpLookup() throws Exception {
+        stopBroker();
+        isTcpLookup = true;
+        setup();
+        testEmptySchema();
+    }
+
+    private void testEmptySchema() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+
+        final String autoProducerTopic = getTopicName(ns, "testEmptySchema");
+
+        @Cleanup
+        Consumer<User> consumer = pulsarClient
+                .newConsumer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1")
+                .subscribe();
+
+        @Cleanup
+        Producer<User> userProducer = pulsarClient
+                .newProducer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+
+        User test = new User("test");
+        userProducer.send(test);
+        producer.send("test".getBytes(StandardCharsets.UTF_8));
+        Message<User> message1 = consumer.receive();
+        Assert.assertEquals(test, message1.getValue());
+        try {
+            Message<User> message2 = consumer.receive();
+            message2.getValue();
+        } catch (Throwable ex) {
+            Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
+            Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
+        }
+    }
+
+    @EqualsAndHashCode
+    static class User implements Serializable {
+        private String name;
+        public User() {}
+        public User(String name) {
+            this.name = name;
+        }
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index eb61a66..ba3281c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -217,9 +218,12 @@ public class BinaryProtoLookupService implements LookupService {
 
     @Override
     public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
-        InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
         CompletableFuture<Optional<SchemaInfo>> schemaFuture = new CompletableFuture<>();
-
+        if (version != null && version.length == 0) {
+            schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version"));
+            return schemaFuture;
+        }
+        InetSocketAddress socketAddress = serviceNameResolver.resolveHost();
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index f2cc169..72326c3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.impl.schema.SchemaUtils;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -162,6 +163,10 @@ public class HttpLookupService implements LookupService {
         String schemaName = topicName.getSchemaName();
         String path = String.format("admin/v2/schemas/%s/schema", schemaName);
         if (version != null) {
+            if (version.length == 0) {
+                future.completeExceptionally(new SchemaSerializationException("Empty schema version"));
+                return future;
+            }
             path = String.format("admin/v2/schemas/%s/schema/%s",
                     schemaName,
                     ByteBuffer.wrap(version).getLong());