You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/09 12:54:34 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request #14626: Fix inconsistent prompt message when schema version is empty using AVRO.

Technoboy- opened a new pull request #14626:
URL: https://github.com/apache/pulsar/pull/14626


   ### Motivation
    
   When create a topic with schema - AVRO,  if a producer sends bytes data directly to the topic, a consumer with HTTP and TCP lookup has different prompt message. See below test:
   
   ```
   # Consume
   Consumer<User> consumer = pulsarClient
                   .newConsumer(Schema.AVRO(User.class))
                   .topic("persistent://public/default/test")
                   .subscriptionName("sub-2")
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   while(true){
        Message<User> message = consumer.receive();
        System.out.println("received msg : " + message.getValue());
   }
   
   # Produce
   bin/pulsar-client produce -n 5 -m "hello" test
   ```
   If the consumer is using TCP lookup, it will get the below message :
   ```
   Exception in thread "main" com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY
   	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
   	at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
   	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
   	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
   	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:83)
   ```
   But with HTTP lookup, the error message change to :
   ```
   com.google.common.util.concurrent.UncheckedExecutionException: com.google.common.util.concurrent.UncheckedExecutionException: java.nio.BufferUnderflowException
   	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
   	at com.google.common.cache.LocalCache.get(LocalCache.java:3966)
   	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3989)
   	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4950)
   	at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.getSchemaReader(AbstractMultiVersionReader.java:82)
   ```
   The root cause is that : if producer produce with bytes data, the schema version is empty. When in TCP lookup, the server will throw unchecked exception here(line-244) :
   https://github.com/apache/pulsar/blob/7c1f17a6f0e2f1a5775277d61f51ac31636f39b4/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L236-L245
   
   When in HTTP lookup, the error will throw by the client here(line-164):
   https://github.com/apache/pulsar/blob/7c1f17a6f0e2f1a5775277d61f51ac31636f39b4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java#L161-L165
   
   So in order to keep the consistent prompt message, we will check the empty schema at the client-side and throw the same exception.
   
   ### Modifications
   - Keep checking the empty version at the client-side.
   - Using the same exception.
   - Add server-side check at ServerCnx#handleGetSchema to help non-java client to print the same error.
   
   ### Documentation
     
   - [x] `no-need-doc` 
     
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14626: Fix inconsistent prompt message when schema version is empty using AVRO.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14626:
URL: https://github.com/apache/pulsar/pull/14626#discussion_r824386897



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
##########
@@ -1112,4 +1114,102 @@ private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
         }
     }
 
+    @Test
+    public void testAvroSchemaWithHttpLookup() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+        final String autoProducerTopic = getTopicName(ns, "testAvroSchemaWithHttpLookup");
+
+        @Cleanup
+        Consumer<User> consumer = pulsarClient
+                .newConsumer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1")
+                .subscribe();
+
+        @Cleanup
+        Producer<User> userProducer = pulsarClient
+                .newProducer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+        User test = new User("test");
+        userProducer.send(test);
+        producer.send("test".getBytes(StandardCharsets.UTF_8));
+        Message<User> message1 = consumer.receive();
+        Assert.assertEquals(test, message1.getValue());
+        try {
+            Message<User> message2 = consumer.receive();
+            message2.getValue();
+        } catch (Throwable ex) {
+            Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
+            Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
+        }
+    }
+
+    @Test
+    public void testAvroSchemaWithTcpLookup() throws Exception {

Review comment:
       Yes, right. updated. I have specified `isTcpLookup=false` in `testAvroSchemaWithHttpLookup`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #14626: Fix inconsistent prompt message when schema version is empty using AVRO.

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #14626:
URL: https://github.com/apache/pulsar/pull/14626#discussion_r824383212



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
##########
@@ -1112,4 +1114,102 @@ private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
         }
     }
 
+    @Test
+    public void testAvroSchemaWithHttpLookup() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+        final String autoProducerTopic = getTopicName(ns, "testAvroSchemaWithHttpLookup");
+
+        @Cleanup
+        Consumer<User> consumer = pulsarClient
+                .newConsumer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1")
+                .subscribe();
+
+        @Cleanup
+        Producer<User> userProducer = pulsarClient
+                .newProducer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+        User test = new User("test");
+        userProducer.send(test);
+        producer.send("test".getBytes(StandardCharsets.UTF_8));
+        Message<User> message1 = consumer.receive();
+        Assert.assertEquals(test, message1.getValue());
+        try {
+            Message<User> message2 = consumer.receive();
+            message2.getValue();
+        } catch (Throwable ex) {
+            Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
+            Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
+        }
+    }
+
+    @Test
+    public void testAvroSchemaWithTcpLookup() throws Exception {

Review comment:
       I'm not sure which test will be executed first. Maybe we could set lookup way for both `testAvroSchemaWithHttpLookup` and `testAvroSchemaWithTcpLookup` and merge their logic, or we need to specify the test execution order, if not we couldn't make sure the test `testAvroSchemaWithHttpLookup` use HTTP lookup way. For example, if the test `testAvroSchemaWithTcpLookup` execute first, the broker lookup way will be change to TCP lookup.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #14626: Fix inconsistent prompt message when schema version is empty using AVRO.

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #14626:
URL: https://github.com/apache/pulsar/pull/14626#discussion_r824383212



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
##########
@@ -1112,4 +1114,102 @@ private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
         }
     }
 
+    @Test
+    public void testAvroSchemaWithHttpLookup() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+        final String autoProducerTopic = getTopicName(ns, "testAvroSchemaWithHttpLookup");
+
+        @Cleanup
+        Consumer<User> consumer = pulsarClient
+                .newConsumer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1")
+                .subscribe();
+
+        @Cleanup
+        Producer<User> userProducer = pulsarClient
+                .newProducer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+        User test = new User("test");
+        userProducer.send(test);
+        producer.send("test".getBytes(StandardCharsets.UTF_8));
+        Message<User> message1 = consumer.receive();
+        Assert.assertEquals(test, message1.getValue());
+        try {
+            Message<User> message2 = consumer.receive();
+            message2.getValue();
+        } catch (Throwable ex) {
+            Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
+            Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
+        }
+    }
+
+    @Test
+    public void testAvroSchemaWithTcpLookup() throws Exception {

Review comment:
       I'm not sure which test will be executed first. Maybe we could set lookup way for both `testAvroSchemaWithHttpLookup` and `testAvroSchemaWithTcpLookup` and merge their logic, or we need to specify the test execution order, if not we couldn't make sure the test `testAvroSchemaWithHttpLookup` use HTTP lookup way, for example, if the test `testAvroSchemaWithTcpLookup` execute first, the broker lookup way will be change to TCP lookup.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #14626: Fix inconsistent prompt message when schema version is empty using AVRO.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #14626:
URL: https://github.com/apache/pulsar/pull/14626


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #14626: Fix inconsistent prompt message when schema version is empty using AVRO.

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #14626:
URL: https://github.com/apache/pulsar/pull/14626#discussion_r824383212



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
##########
@@ -1112,4 +1114,102 @@ private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
         }
     }
 
+    @Test
+    public void testAvroSchemaWithHttpLookup() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+        final String autoProducerTopic = getTopicName(ns, "testAvroSchemaWithHttpLookup");
+
+        @Cleanup
+        Consumer<User> consumer = pulsarClient
+                .newConsumer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub-1")
+                .subscribe();
+
+        @Cleanup
+        Producer<User> userProducer = pulsarClient
+                .newProducer(Schema.AVRO(User.class))
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(autoProducerTopic)
+                .enableBatching(false)
+                .create();
+        User test = new User("test");
+        userProducer.send(test);
+        producer.send("test".getBytes(StandardCharsets.UTF_8));
+        Message<User> message1 = consumer.receive();
+        Assert.assertEquals(test, message1.getValue());
+        try {
+            Message<User> message2 = consumer.receive();
+            message2.getValue();
+        } catch (Throwable ex) {
+            Assert.assertTrue(Throwables.getRootCause(ex) instanceof SchemaSerializationException);
+            Assert.assertEquals(Throwables.getRootCause(ex).getMessage(),"Empty schema version");
+        }
+    }
+
+    @Test
+    public void testAvroSchemaWithTcpLookup() throws Exception {

Review comment:
       I'm not sure which test will be executed first. Maybe we could set lookup way for both `testAvroSchemaWithHttpLookup` and `testAvroSchemaWithTcpLookup` and merge their logic, or we need to specify the test execution order, if not we couldn't make sure the test `testAvroSchemaWithHttpLookup` use HTTP lookup way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org