You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/08/27 11:55:05 UTC
[GitHub] [pulsar] borlandor opened a new issue #5057: Can't publish messages
with schema on Pulsar C++ client
borlandor opened a new issue #5057: Can't publish messages with schema on Pulsar C++ client
URL: https://github.com/apache/pulsar/issues/5057
**Describe the bug**
Using Pulsar C++ client,I can't publish messages with the desired schema .
**To Reproduce**
Steps to reproduce the behavior:
1. C++ Client as Producer,Python Client as Consumer
2. Using JSONSchema which defined as:
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\","
"\"fields\":[{\"name\":\"a\",\"type\":[\"null\", \"int\" ]},{\"name\":\"b\",\"type\":[\"null\",\"int\"]}]}";
3. Producer send a struct with value (a=100 and b=200):
struct Example
{
int a;
int b;
};
4. Consumer received message with incorrect format:
0200000000000000a08baa13ed7f00000800000000000000ffffffffffffffff0000000064000000c800000000
which raised error:
Received message msg
0200000000000000a08baa13ed7f00000800000000000000ffffffffffffffff0000000064000000c800000000
Traceback (most recent call last):
File "ConsumerForSchema.py", line 23, in <module>
printMem(msg.value())
File "/home/Pulsar/apache-pulsar-2.4.0-src/pulsar-client-cpp/python/pulsar/__init__.py", line 160, in value
return self._schema.decode(self._message.data())
File "/home/Pulsar/apache-pulsar-2.4.0-src/pulsar-client-cpp/python/pulsar/schema/schema.py", line 86, in decode
return self._record_cls(**json.loads(data))
File "/usr/lib64/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python2.7/json/decoder.py", line 384, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
**Expected behavior**
Python Consumer expected to receive message like this with JSONSchema:
0200000000000000a09bdd72ee7f00001900000000000000ffffffffffffffff000000007b0a202261223a203130302c200a202262223a203230300a7d00
7b0a202261223a203130302c200a202262223a203230300a7d means:
{
"a": 100,
"b": 200
}
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
**Additional context**
1. Pulsar C++ Client -- Producer Code:
static const std::string exampleSchema =
"{\"type\":\"record\",\"name\":\"Example\","
"\"fields\":[{\"name\":\"a\",\"type\":[\"null\", \"int\" ]},{\"name\":\"b\",\"type\":[\"null\",\"int\"]}]}";
struct Example
{
int a;
int b;
};
ClientConfiguration config;
Client client(lookupUrl);
Result res;
Producer producer;
ProducerConfiguration producerConf;
producerConf.setSchema(SchemaInfo(JSON, "Json", exampleSchema));
res = client1.createProducer("topic-avro1", producerConf, producer);
// Publish 10 messages to the topic
Example myExample;
myExample.a = 100;
myExample.b = 200;
Message msg = MessageBuilder().setContent(&myExample,sizeof(Example)).build();
Result res = producer.send(msg);
producer.close();
client.close();
2. Python Client -- Consumer Code:
import pulsar
from pulsar.schema import *
class Example(Record):
a = Integer()
b = Integer()
def printMem(data):
from ctypes import string_at
from sys import getsizeof
from binascii import hexlify
print(hexlify(string_at(id(data), getsizeof(data))))
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
topic='topic-avro1',
subscription_name='my-subscription',
schema=JsonSchema(Example))
while True:
msg = consumer.receive()
print("Received message msg")
printMem(msg.data())
printMem(msg.value())
ex = msg.value()
try:
# print("Received message a={} b={} ".format(ex.a, ex.b))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services