You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/06 04:40:10 UTC

[GitHub] [pulsar] BewareMyPower opened a new issue, #15036: [C++] Schema version not added to a message

BewareMyPower opened a new issue, #15036:
URL: https://github.com/apache/pulsar/issues/15036

   **Describe the bug**
   When a message was sent to a topic with protobuf native schema by C++ client, the message won't have a schema version.
   
   **To Reproduce**
   First, run the C++ unit test to send a message to a topic named `ProtobufSchemaTest-testEndToEnd`.
   
   ```bash
   ./tests/main --gtest_filter='ProtobufNativeSchemaTest.testEndToEnd'  
   ```
   
   We can query the topic's schema is already `PROTOBUF_NATIVE` now.
   
   ```bash
   $ curl -L http://localhost:8080/admin/v2/schemas/public/default/ProtobufSchemaTest-testEndToEnd/schema
   {"version":0,"type":"PROTOBUF_NATIVE","timestamp":0,"data":"{\"fileDescriptorSet\":\"CtMDCgpUZXN0LnByb3RvEgVwcm90bxoSRXh0ZXJuYWxUZXN0LnByb3RvImUKClN1Yk1lc3NhZ2USCwoDZm9vGAEgASgJEgsKA2JhchgCIAEoARo9Cg1OZXN0ZWRNZXNzYWdlEgsKA3VybBgBIAEoCRINCgV0aXRsZRgCIAEoCRIQCghzbmlwcGV0cxgDIAMoCSLlAQoLVGVzdE1lc3NhZ2USEwoLc3RyaW5nRmllbGQYASABKAkSEwoLZG91YmxlRmllbGQYAiABKAESEAoIaW50RmllbGQYBiABKAUSIQoIdGVzdEVudW0YBCABKA4yDy5wcm90by5UZXN0RW51bRImCgtuZXN0ZWRGaWVsZBgFIAEoCzIRLnByb3RvLlN1Yk1lc3NhZ2USFQoNcmVwZWF0ZWRGaWVsZBgKIAMoCRI4Cg9leHRlcm5hbE1lc3NhZ2UYCyABKAsyHy5wcm90by5leHRlcm5hbC5FeHRlcm5hbE1lc3NhZ2UqJAoIVGVzdEVudW0SCgoGU0hBUkVEEAASDAoIRkFJTE9WRVIQAUItCiVvcmcuYXBhY2hlLnB1bHNhci5jbGllbnQuc2NoZW1hLnByb3RvQgRUZXN0YgZwcm90bzMKoAEKEkV4dGVybmFsVGVzdC5wcm90bxIOcHJvdG8uZXh0ZXJuYWwiOwoPRXh0ZXJuYWxNZXNzYWdlEhMKC3N0cmluZ0ZpZWxkGAEgASgJEhMKC2RvdWJsZUZpZWxkGAIgASgBQjUKJW9yZy5hcGFjaGUucHVsc2FyLmNsaWVudC5zY2hlbWEucHJvdG9CDEV4dGVybmFsVGVzdGIGcHJvdG8z\",\"rootMessageTypeName\":\"proto.TestMessage\",\"rootFileDescrip
 torName\":\"Test.proto\"}","properties":{}}
   ```
   
   Then, run following Java application code, which also sends a message (with the same [Test.proto](https://github.com/apache/pulsar/blob/master/pulsar-client/src/test/proto/Test.proto)), and consume these two messages.
   
   ```java
           final String topic = "ProtobufSchemaTest-testEndToEnd";
           try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
               Producer<Test.TestMessage> producer = client.newProducer(ProtobufNativeSchema.of(Test.TestMessage.class))
                       .topic(topic)
                       .create();
               producer.send(Test.TestMessage.newBuilder().setTestEnum(Test.TestEnum.SHARED).build());
   
               Consumer<Test.TestMessage> consumer = client.newConsumer(ProtobufNativeSchema.of(Test.TestMessage.class))
                       .topic(topic)
                       .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                       .subscriptionName("my-sub")
                       .subscribe();
               for (int i = 0; i < 2; i++) {
                   final Message<Test.TestMessage> msg = consumer.receive(2, TimeUnit.SECONDS);
                   if (msg == null) break;
                   // NOTE: getReaderSchema() returns null in this case
                   System.out.println(i + ", enum value: " + msg.getValue().getTestEnum()
                           + ", schema version: "
                           + (msg.getSchemaVersion() == null ? "(null)" : Schema.INT64.decode(msg.getSchemaVersion()))
                   );
               }
   
               Consumer<GenericRecord> autoConsumer = client.newConsumer(Schema.AUTO_CONSUME())
                       .topic(topic)
                       .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                       .subscriptionName("my-sub-auto-consume")
                       .subscribe();
               for (int i = 0; i < 2; i++) {
                   final Message<GenericRecord> msg = autoConsumer.receive(2, TimeUnit.SECONDS);
                   if (msg == null) break;
                   System.out.println(i + " reader schema: "
                           + msg.getReaderSchema().map(Object::toString).orElse("(null)"));
               }
           }
   ```
   
   The output is
   
   ```
   0, enum value: FAILOVER, schema version: (null)
   1, enum value: SHARED, schema version: 0
   0 reader schema: org.apache.pulsar.client.impl.schema.BytesSchema@72cc7e6f
   1 reader schema: VersionedSchema(type=PROTOBUF_NATIVE,schemaVersion=\x00\x00\x00\x00\x00\x00\x00\x00,name=public/default/ProtobufSchemaTest-testEndToEnd)
   ```
   
   We can see message 0 was sent by C++ producer, whose enum value is `FAILOVER`. However, message 0 doesn't contain a schema version and its reader schema is `BytesSchema`, not `VersionedSchema`.
   
   BTW, if we added following code to `TEST(ProtobufNativeSchemaTest, testEndToEnd)`
   
   ```c++
       if (msg.hasSchemaVersion()) {
           std::cout << "message's schema version: " << msg.getSchemaVersion() << std::endl;
       } else {
           std::cout << "no schema version" << std::endl;
       }
   ```
   
   we can see the output is `no schema version`.
   
   **Expected behavior**
   We should add the schema version to messages with protobuf native schema. i.e. the following check should pass:
   
   ```c++
   ASSERT_TRUE(msg.hasSchemaVersion());
   ```
   
   In addition, we might need to add `Message::getReaderSchema()` API in C++ client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower closed issue #15036: [C++] Schema version not added to a message

Posted by GitBox <gi...@apache.org>.
BewareMyPower closed issue #15036: [C++] Schema version not added to a message 
URL: https://github.com/apache/pulsar/issues/15036


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on issue #15036: [C++] Schema version not added to a message

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #15036:
URL: https://github.com/apache/pulsar/issues/15036#issuecomment-1091863190

   I also tried to use `AUTO_CONSUME` schema to consume messages from C++ client now.
   
   ```java
           final String topic = "ProtobufSchemaTest-testEndToEnd";
           try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
               Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME())
                       .topic(topic)
                       .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                       .subscriptionName("my-sub-auto-consume")
                       .subscribe();
               while (true) {
                   Message<GenericRecord> msg = consumer.receive(1, TimeUnit.SECONDS);
                   if (msg == null) {
                       break;
                   }
                   try {
                       DynamicMessage message = ((GenericProtobufNativeRecord) msg.getValue()).getProtobufRecord();
                       Descriptors.FieldDescriptor fieldDescriptor =
                               message.getDescriptorForType().findFieldByName("testEnum");
                       if (fieldDescriptor == null) continue;
                       System.out.println("enum value: " + message.getField(fieldDescriptor));
                   } catch (ClassCastException e) {
                       System.out.println(" failed to convert value to GenericProtobufNativeRecord: "
                               + e.getMessage());
                   }
               }
           }
   ```
   
   The output is
   
   ```
   enum value: FAILOVER
   ```
   
   So it works well now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on issue #15036: [C++] Schema version not added to a message

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #15036:
URL: https://github.com/apache/pulsar/issues/15036#issuecomment-1091850972

   After applying the patch of #15072, the output became
   
   ```
   0, enum value: FAILOVER, schema version: 0
   1, enum value: SHARED, schema version: 0
   0 reader schema: VersionedSchema(type=PROTOBUF_NATIVE,schemaVersion=\x00\x00\x00\x00\x00\x00\x00\x00,name=public/default/ProtobufSchemaTest-testEndToEnd)
   1 reader schema: VersionedSchema(type=PROTOBUF_NATIVE,schemaVersion=\x00\x00\x00\x00\x00\x00\x00\x00,name=public/default/ProtobufSchemaTest-testEndToEnd)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org