You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/07 17:30:05 UTC

[GitHub] [pulsar] fmiguelez opened a new issue #7407: Support for tombstones (null value in messages) does not work

fmiguelez opened a new issue #7407:
URL: https://github.com/apache/pulsar/issues/7407


   **Describe the bug**
   
   The solution provided by #7139 to the BUG #4803 *does not work*.
   
   * When trying to read a message with `null` value a `NullPointerException` is thrown in other part of the code.
   ```
           msg = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS));
   
   	java.lang.NullPointerException
   		at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.updateNumMsgsReceived(ConsumerStatsRecorderImpl.java:169)
   		at org.apache.pulsar.client.impl.ConsumerImpl.messageProcessed(ConsumerImpl.java:1423)
   		at org.apache.pulsar.client.impl.ConsumerImpl.internalReceive(ConsumerImpl.java:431)
   		at org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:175)
                   ...
   ```
   * It should not be required to explicitly indicate a null value to producer (only-key values should work just just fine). Exception thrown when working with implicit null value messages is EOFException in this case (the same before this a solution was provided).
   
   **To Reproduce**
   I have created a test project to reproduce these issues (`null` values implicitly and explictly set with both schema and schemaless consumer): [pulsar-tombstone-test](https://github.com/fmiguelez/pulsar-tombstone-test)
   
   Read README.md to reproduce it.
   
   **Expected behavior**
   Tombstones (`null` values in mesages with our without schema but with key) should be supported whether you indicate an schema or not and whether you explicitly indicate a `null` value or not (implicit `null` value). All tests should pass in the example project.
   
   ```
   Producer<byte []> schemalessProducer = client.newProducer(Schema.BYTES).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
   
   Consumer<byte []> schemalessConsumer = client.newConsumer(Schema.BYTES).topic(TOPIC).subscriptionName("test")
   				.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
   
   // Implicit tombstone  without schema
   schemalessProducer.key("1").send();
   
   // Explicit tombstone (the one supposedly to work)
   schemalessProducer.key("2").value(null).send();
   
   Message<byte[]> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
   Message<byte[]> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())
   
   System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
   System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
   ```
   
   ```
   Producer<DummyObject> schemaProducer = client.newProducer(Schema.AVRO(DummyObject.class)).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
   
   Consumer<DummyObject> schemaConsumer = client.newConsumer(Schema.AVRO(DummyObject.class)).topic(TOPIC).subscriptionName("test")
   				.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
   
   // Implicit tombstone  without schema
   schemaProducer.key("1").send();
   
   // Explicit tombstone (the one supposedly to work)
   schemaProducer.key("2").value(null).send();
   
   Message<DummyObject> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
   Message<DummyObject> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())
   
   System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
   System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
   ```
   
   
   **Screenshots**
   
   **Desktop (please complete the following information):**
    - Windows 10 with Docker Deskto to run Pulsar containers
   
   **Additional context**
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   @fmiguelez Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with Avro schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null value directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, if topics use the struct schema consumer could return a null value for empty byte array and null value, if topics use the bytes schema consumer could return the original message payload (empty byte array or null value). please confirm @sijie 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   @fmiguelez Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with Avro schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null value directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, if topics use the Avro schema consumer couldn't handle the empty byte array payload, maybe the consumer should return a null value for empty byte array.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
sijie commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-655013920


   It was closed because of `Fixes` comment in the description. I have reopened it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with schema the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, maybe we could return a null value if the schema message payload is empty byte array or null value, if the topic not use the schema consumer could return the original message payload (empty byte array or null value).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   @fmiguelez Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, maybe we could return a null value if the schema message payload is empty byte array or null value, if the topic not use the schema consumer could return the original message payload (empty byte array or null value). please confirm @sijie 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] longtengz commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
longtengz commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-986551359


   I think pulsar sql can't deal with empty payload messages because of the same issues here described here. See my issue here #13127. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fmiguelez commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
fmiguelez commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-654775988


   I do not think that this ticket should be closed as pull request only solves one of the cases described by the tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with schema the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, maybe we could return a null value if the schema message payload is empty byte array or null value, if the topic not use the schema consumer could return the original message payload (empty byte array or null value).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] longtengz edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
longtengz edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-986551359


   I think pulsar sql can't deal with empty payload messages because of the same issues described here. See my issue #13127. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   @fmiguelez Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with schema the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, maybe we could return a null value if the schema message payload is empty byte array or null value, if the topic not use the schema consumer could return the original message payload (empty byte array or null value). please confirm @sijie 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   @fmiguelez Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, maybe we could return a null value if the schema message payload is empty byte array or null value, if the topic use the bytes schema consumer could return the original message payload (empty byte array or null value). please confirm @sijie 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie closed issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
sijie closed issue #7407:
URL: https://github.com/apache/pulsar/issues/7407


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   @fmiguelez Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with Avro schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null value directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, if topics use the struct schema consumer could return a null value for empty byte array and null value, if topics use the bytes schema consumer could return the original message payload (empty byte array or null value).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] fmiguelez commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
fmiguelez commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-783457298


   @sijie, @gaoran10 Any update on this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-652741491


   Thanks @fmiguelez for open the issue and related PR. @gaoran10 to help review the Pr.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 edited a comment on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 edited a comment on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-698094497


   @fmiguelez Sorry, late response.
   
   1. `producer.newMessage().key("1").send()`
   
   Currently, if we send messages as above the message payload is `ByteBuffer.allocate(0)`, thus if the topic with Avro schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
   
   ```
   org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
   
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
   	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
   	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:648)
   	at org.testng.TestRunner.run(TestRunner.java:505)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
   	at org.testng.TestNG.runSuites(TestNG.java:1049)
   	at org.testng.TestNG.run(TestNG.java:1017)
   	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
   	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
   Caused by: java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
   	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
   	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
   	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
   	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
   	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
   	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
   	... 28 more
   ```
   
   2. `producer.newMessage().key("1").value(null).send()`
   
   If we specify the value is null, the messageMetaData will record the flag `msgMetadataBuilder.setNullValue(true)`, the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null value directly.
   
   3. Currently, the message value has three forms, normal value, empty byte array, and null value, if topics use the Avro schema consumer couldn't handle the empty byte array payload, maybe the consumer should return a null value for the empty byte array.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on issue #7407: Support for tombstones (null value in messages) does not work

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on issue #7407:
URL: https://github.com/apache/pulsar/issues/7407#issuecomment-785510836


   @fmiguelez Is there any block about this issue? Could you provide more details?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org