You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Apache Pulsar Slack <> on 2019/03/04 09:11:07 UTC

Slack digest for #general - 2019-03-04

2019-03-03 09:35:58 UTC - Ben S: @Ben S has joined the channel
2019-03-03 09:40:02 UTC - Ben S: Hey all!
2019-03-03 09:40:36 UTC - Ali Ahmed: Hello
2019-03-03 09:44:16 UTC - Ben S: I have a problem where I‘m currently stuck. Consider the following example:

- There are tens of thousands of shops around the world where internal processes should be recorded

- For each shop location the order of internal process tasks must be preserved

- There is no global list of shops. Process tasks may fly in from one location, without previous knowledge about that location

- Recording the process tasks is critical, no task must be missed (failover)
My understanding is that in order to preserve the order of processes within each location, I need to create a topic for each store location like this:


And for each topic I need to create an exclusive consumer with failovers.

The problem is that since I do not know the locations in advance, what is the best way to create and destroy  consumers ad-hoc for each unknown location?
2019-03-03 09:46:42 UTC - Ali Ahmed: you can create a consumer the same time you start a producer with a new topic
2019-03-03 09:55:14 UTC - Ben S: Oh, I realize I oversimplified my example. My actual case looks more complex, and different. I need to add one more point to the example:
- There are multiple processes. Ordering must be kept within each process. But in order for the system to scale, it is important that individual process steps can be parallelized. 
2019-03-03 09:57:35 UTC - Ali Ahmed: are you trying to replicate a traditional job queue with arbitrary num of workers ?
2019-03-03 09:59:47 UTC - Ben S: In some way, it is a combination of a tiny job queue with two consecutive tasks, but it‘s part of a real time event system
2019-03-03 10:01:30 UTC - Ali Ahmed: you should be able to accomplish it with <>
2019-03-03 10:01:35 UTC - Ali Ahmed: <>
2019-03-03 11:39:29 UTC - Ben S: Thank you, this looks interesting
2019-03-03 11:46:11 UTC - Yuvaraj Loganathan: Hi Team,
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.pulsar.client.api.*;
import schematest1.User;


public class SchemaConsumer {
	public static void main(String[] args) {
		PulsarClient client = null;
		Consumer&lt;User&gt; consumer = null;
		try {
			ReflectDatumReader&lt;User&gt; reader = new ReflectDatumReader&lt;&gt;(User.getClassSchema());
			client = PulsarClient.builder()
			consumer = client.newConsumer(Schema.AVRO(User.class))
			Message&lt;User&gt; msg = consumer.receive();

		} catch (PulsarClientException e) {
		} catch (IOException e) {
How Can I Deserialize msg to an User Object ?
2019-03-03 11:48:35 UTC - Ali Ahmed: ```msg.getValue()```
2019-03-03 11:50:45 UTC - Yuvaraj Loganathan: @Ali Ahmed Thanks! Will raise an pull request to Doc! :slightly_smiling_face:
+1 : Sijie Guo
2019-03-03 15:40:03 UTC - Yuvaraj Loganathan: Hi Team ,
import pulsar
from pulsar import ConsumerType, MessageId
from pulsar.schema import Record, String, Integer, AvroSchema

from schematest1.User import User

client = pulsar.Client('<pulsar://pulsar.stage.xx.xx:6650>')

consumer = client.subscribe('<persistent://public/default/schema-test1>', 'my-subscription1',consumer_type=ConsumerType.Failover,schema=AvroSchema(User))
while True:
    msg = consumer.receive()


Trying to consume the AVRO Message from topic produced by JAVA Producer.  Receiving Exception: Pulsar error: IncompatibleSchema
Python Schema
from pulsar.schema import Record, String, Integer, AvroSchema
class User(Record):
    name = String()
    favorite_number = Integer()
    favorite_color = String()
    age = Integer(default=18)
Here is AVRO Schema
  "namespace": "schematest1",
  "type": "record",
  "name": "User",
  "fields": [
      "name": "name",
      "type": "string"
      "name": "favorite_number",
      "type": [
      "name": "favorite_color",
      "type": "string"
      "name": "age",
      "type": "int",
      "default": 18
2019-03-03 15:54:16 UTC - Matteo Merli: @Yuvaraj Loganathan Is this schema coming from the Java POJO?
2019-03-03 15:56:49 UTC - Matteo Merli: the diff that can probably be affecting this are  that `favorite_color` and `age` have not “null” type option. Can you try with `favorite_color = String(required=True)`  and same for age ?
2019-03-03 16:55:39 UTC - Yuvaraj Loganathan: Yes
2019-03-03 16:55:58 UTC - Yuvaraj Loganathan: Here is the broker schema get response ```{
    "version": 0,
    "type": "AVRO",
    "timestamp": 0,
    "data": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"schematest1\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"favorite_number\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"favorite_color\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":\"int\"}]}",
    "properties": {}
}``` for the topic
2019-03-03 16:59:01 UTC - Yuvaraj Loganathan: Still the same response ```from pulsar.schema import Record, String, Integer, AvroSchema
class User(Record):
    name = String(required=True)
    favorite_number = Integer()
    favorite_color = String(required=True)
    age = Integer(default=18,required=True) ```
2019-03-03 17:01:26 UTC - Yuvaraj Loganathan: is it possible to get the scheme from the message so that we can manually decode the bytes ?
2019-03-03 17:02:28 UTC - Matteo Merli: Yes, `User.schema()` should get you that 
2019-03-03 17:05:50 UTC - Matteo Merli: (maybe I misunderstood your question: currently it’s not doing an “auto-deserialize” kind of thing..
2019-03-03 17:06:00 UTC - Yuvaraj Loganathan: `{'name': 'User', 'type': 'record', 'fields': [{'name': 'age', 'type': 'int'}, {'name': 'favorite_color', 'type': 'string'}, {'name': 'favorite_number', 'type': ['null', 'int']}, {'name': 'name', 'type': 'string'}]}` This is the response
2019-03-03 17:07:05 UTC - Yuvaraj Loganathan: namespace is missing in the `User.schema()` output. Is there an way to define namespace in python ?
2019-03-03 17:09:11 UTC - Matteo Merli: No, but that it’s not breaking the Avro validation. We have tests for these cases between java &amp; python
2019-03-03 17:10:53 UTC - Matteo Merli: eg:
2019-03-03 17:14:42 UTC - Yuvaraj Loganathan: Thanks @Matteo Merli Will go through and come back
2019-03-03 17:15:20 UTC - Guy Feldman: FWIW I also had some issues with incompatible schema messages
2019-03-03 17:16:00 UTC - Matteo Merli: Indeed there might be some unconvered corners :slightly_smiling_face:
2019-03-03 17:16:22 UTC - Guy Feldman: I ended up resolving it by hitting the schema endpoints when creating the topic
2019-03-03 17:16:35 UTC - Matteo Merli: @Yuvaraj Loganathan can you share the Java POJO as well, to create a test case for that?
2019-03-03 17:20:18 UTC - Guy Feldman: @Yuvaraj Loganathan you also may want to add a default value of `null` for the favorite number field
2019-03-03 17:20:31 UTC - Guy Feldman: if you want it to be optional
2019-03-03 17:20:52 UTC - Matteo Merli: that shouldn’t be required, ideally..
2019-03-03 17:23:17 UTC - Guy Feldman: yeah i guess its not
2019-03-03 17:24:21 UTC - Yuvaraj Loganathan: I have first written the avro schema file and then generated avro schema definitions and given the generated POJO to Pulsar producer.
2019-03-03 17:26:42 UTC - Matteo Merli: the thing is that avro doesn’t tell *why* it’s not compatible :confused:
2019-03-03 17:27:13 UTC - Yuvaraj Loganathan: ```from pulsar.schema import Record, String, Integer, AvroSchema
class User(Record):
    name = String()
    favorite_number = Integer()
    favorite_color = String()
    age = Integer(default=18,required=True)``` This is working for me. now
Broker Schema Response is
    "version": 0,
    "type": "AVRO",
    "timestamp": 0,
    "data": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"schematest1\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"favorite_number\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"favorite_color\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":\"int\"}]}",
    "properties": {}
2019-03-03 17:28:35 UTC - Yuvaraj Loganathan: `User.schema()` response is
{"name": "User", "type": "record", "fields": [{"name": "age", "type": "int"}, {"name": "favorite_color", "type": ["null", "string"]}, {"name": "favorite_number", "type": ["null", "int"]}, {"name": "name", "type": ["null", "string"]}]}
2019-03-03 17:29:17 UTC - Matteo Merli: I guess that when a field has `default=xyz` we could automatically treat it as `required=True`, because the value will always be there?
2019-03-03 17:30:42 UTC - Yuvaraj Loganathan: Now receiving this error
```Traceback (most recent call last):
  File "/home/uva/PycharmProjects/learn/", line 16, in &lt;module&gt;
  File "/home/uva/yyyy/sources/xxxxx/learn/lib/python3.5/site-packages/pulsar/", line 156, in value
    return self._schema.decode(
  File "/home/uva/beam/sources/aws-sc-worker/learn/lib/python3.5/site-packages/pulsar/schema/", line 103, in decode
    d = fastavro.schemaless_reader(buffer, self._schema)
  File "fastavro/_read.pyx", line 763, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 773, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 564, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 460, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 562, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 435, in fastavro._read.read_union
IndexError: list index out of range```
2019-03-03 17:32:09 UTC - Yuvaraj Loganathan: Yes!
2019-03-03 17:32:38 UTC - Yuvaraj Loganathan: Once I am able to match the broker schema and `User.schema()` Incompatible Schema error is gone. But the above error appears.
2019-03-03 17:33:08 UTC - Matteo Merli: That I haven’t seen yet
2019-03-03 17:33:29 UTC - Matteo Merli: I’ll try with the above schema definitions later
2019-03-03 17:38:52 UTC - Guy Feldman: @Yuvaraj Loganathan i don't know if this is related, but are you using the avro maven plugin
2019-03-03 17:39:01 UTC - Yuvaraj Loganathan: @Guy Feldman Yes
2019-03-03 17:39:14 UTC - Guy Feldman: Did you define a string type configuration
2019-03-03 17:40:08 UTC - Guy Feldman: nm it shouldn't matter
2019-03-03 17:40:15 UTC - Yuvaraj Loganathan: No.
2019-03-03 17:40:29 UTC - Yuvaraj Loganathan: I have generated the POJO using avro maven plugin
2019-03-03 17:40:34 UTC - Guy Feldman: by default the maven plugin uses character arrays
2019-03-03 17:40:45 UTC - Guy Feldman: you have to tell it to use a string object
2019-03-03 17:40:48 UTC - Yuvaraj Loganathan: Ah..
2019-03-03 17:41:09 UTC - Yuvaraj Loganathan: Also My Schema is ```{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
      "name": "name",
      "type": "string"
      "name": "favorite_number",
      "type": [
      "name": "favorite_color",
      "type": "string"
      "name": "age",
      "type": "int",
      "default": 18
2019-03-03 17:41:11 UTC - Guy Feldman: but that's serde on the java side. I don't think it should matter
2019-03-03 17:41:26 UTC - Matteo Merli: can you also share the generated Java POJO ?
2019-03-03 17:43:09 UTC - Guy Feldman: gonna try a python consumer with my topics which use avro
2019-03-03 17:48:48 UTC - Yuvaraj Loganathan: @Matteo Merli <>
+1 : Matteo Merli
2019-03-03 17:56:24 UTC - Sijie Guo: there is a problem regarding current avro POJO
2019-03-03 17:59:25 UTC - Sijie Guo: in Java Avro POJO schema, it always convert the fields into Nullable. <>
2019-03-03 18:00:23 UTC - Matteo Merli: Oh, I thought that was Avro default setting
2019-03-03 18:00:28 UTC - Sijie Guo: If you are using a schema where fields doesn’t use union Nullable, you will have incompatible problems.
2019-03-03 18:00:29 UTC - Sijie Guo: I think AllowNull was added by mistake
2019-03-03 18:00:32 UTC - Matteo Merli: that’s why I did the same behavior in Python
2019-03-03 18:02:05 UTC - Sijie Guo: that’s not correct. it is actually causing a lot of confusions. @CongBo or @Penghui Li found the problem and they were saying they will send out a fix for it.
2019-03-03 18:02:26 UTC - Sijie Guo: We should just let Avro handles the schema generation
2019-03-03 18:02:35 UTC - Sijie Guo: we don’t need to add AllowNull
2019-03-03 18:02:36 UTC - Matteo Merli: uhm, the fix will likely be incompatible
2019-03-03 18:02:50 UTC - Sijie Guo: yes
2019-03-03 18:03:05 UTC - Sijie Guo: but its already in a very weird state
2019-03-03 18:03:06 UTC - Sijie Guo: :disappointed:
2019-03-03 18:03:54 UTC - Yuvaraj Loganathan: The same behaviour I am seeing in my schema file and broker registered schema has extra null.
2019-03-03 18:04:14 UTC - Sijie Guo: yes
2019-03-03 18:04:42 UTC - Sijie Guo: the extra null was introduced by `AllowNull`. `AllowNull` converts the schema type into a union type.
2019-03-03 18:04:51 UTC - Sijie Guo: s/schema type/field type
2019-03-03 18:05:59 UTC - Sijie Guo: basically currently there is no way in pulsar’s pojo avro specify non-null fields.
2019-03-03 18:06:15 UTC - Sijie Guo: a safeway to use AvroSchema is to use GenericSchema
2019-03-03 18:06:19 UTC - Yuvaraj Loganathan: Also I do see `default: null` in broker registered schema in all the string fields
2019-03-03 18:06:43 UTC - Sijie Guo: where you can use Avro to build the schema to get around the pojo schema issue
2019-03-03 18:07:27 UTC - Yuvaraj Loganathan: Ok Thanks @Sijie Guo
2019-03-03 18:07:48 UTC - Yuvaraj Loganathan: <> here is the broker schema for this avro schema <>
2019-03-03 18:08:05 UTC - Sijie Guo: @Matteo Merli - for BC, we can rename the AVRO one to AllowNullAvro
2019-03-03 18:08:28 UTC - Sijie Guo: and just introduce a new AVRO schema which doesn’t have AllowNull
2019-03-03 18:08:31 UTC - Sijie Guo: sorry
2019-03-03 18:08:34 UTC - Sijie Guo: keep the current one
2019-03-03 18:08:36 UTC - Matteo Merli: but, in terms of avro auto-generated code, we should extract the same schema as well. So how does one define a “nullable” field in the pojo
2019-03-03 18:08:39 UTC - Sijie Guo: just introduce a new one
2019-03-03 18:09:01 UTC - Matteo Merli: yes, though having 2 is not ideal either :confused:
+1 : Yuvaraj Loganathan
2019-03-03 18:09:27 UTC - Sijie Guo: &gt; So how does one define a “nullable” field in the pojo

@Matteo Merli: current it is always ‘nullable’ because AllowNull
2019-03-03 18:09:51 UTC - Matteo Merli: maybe an internal versioning of our Avro handling, to allow migrating to new type
2019-03-03 18:10:05 UTC - Sijie Guo: AllowNull is just a nightmare. where @Penghui Li and @CongBo spent quite a lot of time on troubleshooting the issue :slightly_smiling_face:
2019-03-03 18:11:08 UTC - Matteo Merli: (yes, and that combined with no specific errors from avro..)
2019-03-03 18:13:12 UTC - Sijie Guo: @Matteo Merli: I think it might be worth checking with @Jerry Peng if there are other considerations regarding AllowNull. (based what I learned it is not needed)
2019-03-03 18:15:46 UTC - Matteo Merli: Yes
2019-03-03 18:16:35 UTC - Matteo Merli: surely there will be some reason. Let’s check out the options there
2019-03-03 19:43:18 UTC - Guy Feldman: I had trouble serializing a dictionary into this type using the standard avro libary
2019-03-03 19:44:19 UTC - Guy Feldman: default value for age doesn't work
2019-03-03 19:44:33 UTC - Guy Feldman: when I specified an age it worked fine
2019-03-03 19:49:53 UTC - Guy Feldman: I think a more general solution is to have the user provide a class with the schema method implemented.
2019-03-03 19:50:44 UTC - Guy Feldman: This way they can parse avro schema with the python avro library
2019-03-03 19:50:51 UTC - Guy Feldman: or provide their own
2019-03-03 19:51:15 UTC - Matteo Merli: though that won’t guarantee that the data complies with such schema
2019-03-03 19:51:42 UTC - Guy Feldman: 
2019-03-03 19:52:18 UTC - Matteo Merli: :slightly_smiling_face:
2019-03-03 19:52:39 UTC - Guy Feldman: alternatively, we can use the python avro library for reading the schemas
2019-03-03 19:53:14 UTC - Matteo Merli: reading the json?
2019-03-03 19:53:22 UTC - Guy Feldman: that's what i'm doing because my schemas also include avro imports
2019-03-03 19:53:48 UTC - Guy Feldman: yeah
2019-03-03 19:53:50 UTC - Guy Feldman: `from avro.schema import SchemaFromJSONData as make_avsc_object`
2019-03-03 19:54:16 UTC - Guy Feldman: then i just use the to_json method to get the json schema
2019-03-04 02:09:30 UTC - Byron: Hi folks. I am observing that the Reader.HasNext method in the Go client can return true even though there isn’t another message available. It may be my misunderstanding of the semantics of the method.
2019-03-04 02:13:06 UTC - Byron: My assumption is that this is a point-in-time lookup of whether the message is the latest and returns false if no more messages have been received and committed by the server.
2019-03-04 08:07:36 UTC - Sijie Guo: +1  we should probably allow user passing in an avro schema :slightly_smiling_face:
2019-03-04 08:09:00 UTC - Sijie Guo: @Matteo Merli @Yuvaraj Loganathan: I created a github issue for the AllowNull problem (<>) @CongBo @Penghui Li are working on the fix
2019-03-04 08:09:41 UTC - Yuvaraj Loganathan: Awesome! Thank You!