You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/08/27 11:55:05 UTC

[GitHub] [pulsar] borlandor opened a new issue #5057: Can't publish messages with schema on Pulsar C++ client

borlandor opened a new issue #5057: Can't publish messages with schema on Pulsar C++ client
URL: https://github.com/apache/pulsar/issues/5057
 
 
   **Describe the bug**
   Using Pulsar C++ client,I can't publish messages with  the desired schema .
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. C++ Client as Producer,Python Client as Consumer
   2. Using JSONSchema  which defined as:
   static const std::string exampleSchema =
       "{\"type\":\"record\",\"name\":\"Example\","
       "\"fields\":[{\"name\":\"a\",\"type\":[\"null\", \"int\" ]},{\"name\":\"b\",\"type\":[\"null\",\"int\"]}]}";
   3. Producer send a struct with value (a=100 and b=200):
   struct Example
   {
   	int a;	
   	int b;	
   };
   4. Consumer received message with incorrect format:
   0200000000000000a08baa13ed7f00000800000000000000ffffffffffffffff0000000064000000c800000000
   which raised error:
   Received message msg
   0200000000000000a08baa13ed7f00000800000000000000ffffffffffffffff0000000064000000c800000000
   Traceback (most recent call last):
     File "ConsumerForSchema.py", line 23, in <module>
       printMem(msg.value())
     File "/home/Pulsar/apache-pulsar-2.4.0-src/pulsar-client-cpp/python/pulsar/__init__.py", line 160, in value
       return self._schema.decode(self._message.data())
     File "/home/Pulsar/apache-pulsar-2.4.0-src/pulsar-client-cpp/python/pulsar/schema/schema.py", line 86, in decode
       return self._record_cls(**json.loads(data))
     File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads
       return _default_decoder.decode(s)
     File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode
       obj, end = self.raw_decode(s, idx=_w(s, 0).end())
     File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode
       raise ValueError("No JSON object could be decoded")
   ValueError: No JSON object could be decoded
   
   **Expected behavior**
   Python  Consumer expected to receive message like this with JSONSchema:
   0200000000000000a09bdd72ee7f00001900000000000000ffffffffffffffff000000007b0a202261223a203130302c200a202262223a203230300a7d00
   7b0a202261223a203130302c200a202262223a203230300a7d means:
   {
    "a": 100, 
    "b": 200
   }
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: [e.g. iOS]
   
   **Additional context**
   1. Pulsar C++ Client -- Producer Code:
   static const std::string exampleSchema =
       "{\"type\":\"record\",\"name\":\"Example\","
       "\"fields\":[{\"name\":\"a\",\"type\":[\"null\", \"int\" ]},{\"name\":\"b\",\"type\":[\"null\",\"int\"]}]}";
   struct Example
   {
   	int a;	
   	int b;	
   };
   ClientConfiguration config;
   Client client(lookupUrl);
   Result res;
   
   Producer producer;
   ProducerConfiguration producerConf;
   producerConf.setSchema(SchemaInfo(JSON, "Json", exampleSchema));
   res = client1.createProducer("topic-avro1", producerConf, producer);
   // Publish 10 messages to the topic
   Example myExample;		
   myExample.a = 100;		
   myExample.b = 200;		
   Message msg = MessageBuilder().setContent(&myExample,sizeof(Example)).build();			
   Result res = producer.send(msg);
   
   producer.close();
   client.close();
   
   2. Python Client -- Consumer Code:
   import pulsar
   from pulsar.schema import *
   
   class Example(Record):
       a = Integer()
       b = Integer()
   
   def printMem(data):
       from ctypes import string_at
       from sys import getsizeof
       from binascii import hexlify
       print(hexlify(string_at(id(data), getsizeof(data))))
           
   client = pulsar.Client('pulsar://localhost:6650')
   consumer = client.subscribe(
                      topic='topic-avro1',
                      subscription_name='my-subscription',
                      schema=JsonSchema(Example))
   while True:
       msg = consumer.receive()
       print("Received message msg")
       printMem(msg.data())
       printMem(msg.value())
       ex = msg.value()
       try:
           # print("Received message a={} b={} ".format(ex.a, ex.b))
           # Acknowledge successful processing of the message
           consumer.acknowledge(msg)
       except:
           # Message failed to be processed
           consumer.negative_acknowledge(msg)
   client.close()
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services