You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/03/04 09:11:07 UTC

Slack digest for #general - 2019-03-04

2019-03-03 09:35:58 UTC - Ben S: @Ben S has joined the channel
----
2019-03-03 09:40:02 UTC - Ben S: Hey all!
----
2019-03-03 09:40:36 UTC - Ali Ahmed: Hello
----
2019-03-03 09:44:16 UTC - Ben S: I have a problem where I‘m currently stuck. Consider the following example:

- There are tens of thousands of shops around the world where internal processes should be recorded

- For each shop location the order of internal process tasks must be preserved

- There is no global list of shops. Process tasks may fly in from one location, without previous knowledge about that location

- Recording the process tasks is critical, no task must be missed (failover)
My understanding is that in order to preserve the order of processes within each location, I need to create a topic for each store location like this:

<persistent://public/default/store-san-francisco>

And for each topic I need to create an exclusive consumer with failovers.

The problem is that since I do not know the locations in advance, what is the best way to create and destroy  consumers ad-hoc for each unknown location?
----
2019-03-03 09:46:42 UTC - Ali Ahmed: you can create a consumer the same time you start a producer with a new topic
----
2019-03-03 09:55:14 UTC - Ben S: Oh, I realize I oversimplified my example. My actual case looks more complex, and different. I need to add one more point to the example:
- There are multiple processes. Ordering must be kept within each process. But in order for the system to scale, it is important that individual process steps can be parallelized. 
----
2019-03-03 09:57:35 UTC - Ali Ahmed: are you trying to replicate a traditional job queue with arbitrary num of workers ?
----
2019-03-03 09:59:47 UTC - Ben S: In some way, it is a combination of a tiny job queue with two consecutive tasks, but it‘s part of a real time event system
----
2019-03-03 10:01:30 UTC - Ali Ahmed: you should be able to accomplish it with <https://pulsar.apache.org/docs/latest/functions/overview/>
----
2019-03-03 10:01:35 UTC - Ali Ahmed: <https://streaml.io/blog/eda-simple-event-processing>
----
2019-03-03 11:39:29 UTC - Ben S: Thank you, this looks interesting
----
2019-03-03 11:46:11 UTC - Yuvaraj Loganathan: Hi Team,
```import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.pulsar.client.api.*;
import schematest1.User;

import java.io.IOException;

public class SchemaConsumer {
	public static void main(String[] args) {
		PulsarClient client = null;
		Consumer&lt;User&gt; consumer = null;
		try {
			ReflectDatumReader&lt;User&gt; reader = new ReflectDatumReader&lt;&gt;(User.getClassSchema());
			client = PulsarClient.builder()
					.serviceUrl("<pulsar://pulsar.stage.xx.xx:6650>")
					.build();
			consumer = client.newConsumer(Schema.AVRO(User.class))
					.topic("<persistent://public/default/schema-test1>")
					.subscriptionType(SubscriptionType.Failover)
					.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
					.subscriptionName("test1")
					.subscribe();
			Message&lt;User&gt; msg = consumer.receive();
			
			
			consumer.close();
			client.close();


		} catch (PulsarClientException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}
```
How Can I Deserialize msg to an User Object ?
----
2019-03-03 11:48:35 UTC - Ali Ahmed: ```msg.getValue()```
----
2019-03-03 11:50:45 UTC - Yuvaraj Loganathan: @Ali Ahmed Thanks! Will raise an pull request to Doc! :slightly_smiling_face:
+1 : Sijie Guo
----
2019-03-03 15:40:03 UTC - Yuvaraj Loganathan: Hi Team ,
```
import pulsar
from pulsar import ConsumerType, MessageId
from pulsar.schema import Record, String, Integer, AvroSchema

from schematest1.User import User

client = pulsar.Client('<pulsar://pulsar.stage.xx.xx:6650>')

consumer = client.subscribe('<persistent://public/default/schema-test1>', 'my-subscription1',consumer_type=ConsumerType.Failover,schema=AvroSchema(User))
consumer.seek(MessageId.earliest)
while True:
    msg = consumer.receive()
    print(msg.data())

consumer.close()
client.close()
```


Trying to consume the AVRO Message from topic produced by JAVA Producer.  Receiving Exception: Pulsar error: IncompatibleSchema
Python Schema
 ```
from pulsar.schema import Record, String, Integer, AvroSchema
class User(Record):
    name = String()
    favorite_number = Integer()
    favorite_color = String()
    age = Integer(default=18)
```
Here is AVRO Schema
```
{
  "namespace": "schematest1",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "favorite_color",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int",
      "default": 18
    }
  ]
}
```
----
2019-03-03 15:54:16 UTC - Matteo Merli: @Yuvaraj Loganathan Is this schema coming from the Java POJO?
----
2019-03-03 15:56:49 UTC - Matteo Merli: the diff that can probably be affecting this are  that `favorite_color` and `age` have not “null” type option. Can you try with `favorite_color = String(required=True)`  and same for age ?
----
2019-03-03 16:55:39 UTC - Yuvaraj Loganathan: Yes
----
2019-03-03 16:55:58 UTC - Yuvaraj Loganathan: Here is the broker schema get response ```{
    "version": 0,
    "type": "AVRO",
    "timestamp": 0,
    "data": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"schematest1\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"favorite_number\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"favorite_color\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":\"int\"}]}",
    "properties": {}
}``` for the topic
----
2019-03-03 16:59:01 UTC - Yuvaraj Loganathan: Still the same response ```from pulsar.schema import Record, String, Integer, AvroSchema
class User(Record):
    name = String(required=True)
    favorite_number = Integer()
    favorite_color = String(required=True)
    age = Integer(default=18,required=True) ```
----
2019-03-03 17:01:26 UTC - Yuvaraj Loganathan: is it possible to get the scheme from the message so that we can manually decode the bytes ?
----
2019-03-03 17:02:28 UTC - Matteo Merli: Yes, `User.schema()` should get you that 
----
2019-03-03 17:05:50 UTC - Matteo Merli: (maybe I misunderstood your question: currently it’s not doing an “auto-deserialize” kind of thing..
----
2019-03-03 17:06:00 UTC - Yuvaraj Loganathan: `{'name': 'User', 'type': 'record', 'fields': [{'name': 'age', 'type': 'int'}, {'name': 'favorite_color', 'type': 'string'}, {'name': 'favorite_number', 'type': ['null', 'int']}, {'name': 'name', 'type': 'string'}]}` This is the response
----
2019-03-03 17:07:05 UTC - Yuvaraj Loganathan: namespace is missing in the `User.schema()` output. Is there an way to define namespace in python ?
----
2019-03-03 17:09:11 UTC - Matteo Merli: No, but that it’s not breaking the Avro validation. We have tests for these cases between java &amp; python
----
2019-03-03 17:10:53 UTC - Matteo Merli: eg:
<https://github.com/apache/pulsar/blob/e9a5e61f06db9780669b39a96c5c29428334a0fe/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java>
<https://github.com/apache/pulsar/blob/master/tests/docker-images/latest-version-image/python-examples/producer_schema.py>
----
2019-03-03 17:14:42 UTC - Yuvaraj Loganathan: Thanks @Matteo Merli Will go through and come back
----
2019-03-03 17:15:20 UTC - Guy Feldman: FWIW I also had some issues with incompatible schema messages
----
2019-03-03 17:16:00 UTC - Matteo Merli: Indeed there might be some unconvered corners :slightly_smiling_face:
----
2019-03-03 17:16:22 UTC - Guy Feldman: I ended up resolving it by hitting the schema endpoints when creating the topic
----
2019-03-03 17:16:35 UTC - Matteo Merli: @Yuvaraj Loganathan can you share the Java POJO as well, to create a test case for that?
----
2019-03-03 17:20:18 UTC - Guy Feldman: @Yuvaraj Loganathan you also may want to add a default value of `null` for the favorite number field
----
2019-03-03 17:20:31 UTC - Guy Feldman: if you want it to be optional
----
2019-03-03 17:20:52 UTC - Matteo Merli: that shouldn’t be required, ideally..
----
2019-03-03 17:23:17 UTC - Guy Feldman: yeah i guess its not
----
2019-03-03 17:24:21 UTC - Yuvaraj Loganathan: I have first written the avro schema file and then generated avro schema definitions and given the generated POJO to Pulsar producer.
----
2019-03-03 17:26:42 UTC - Matteo Merli: the thing is that avro doesn’t tell *why* it’s not compatible :confused:
----
2019-03-03 17:27:13 UTC - Yuvaraj Loganathan: ```from pulsar.schema import Record, String, Integer, AvroSchema
class User(Record):
    name = String()
    favorite_number = Integer()
    favorite_color = String()
    age = Integer(default=18,required=True)``` This is working for me. now
Broker Schema Response is
```
{
    "version": 0,
    "type": "AVRO",
    "timestamp": 0,
    "data": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"schematest1\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"favorite_number\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"favorite_color\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"age\",\"type\":\"int\"}]}",
    "properties": {}
}
```
----
2019-03-03 17:28:35 UTC - Yuvaraj Loganathan: `User.schema()` response is
```
{"name": "User", "type": "record", "fields": [{"name": "age", "type": "int"}, {"name": "favorite_color", "type": ["null", "string"]}, {"name": "favorite_number", "type": ["null", "int"]}, {"name": "name", "type": ["null", "string"]}]}
```
----
2019-03-03 17:29:17 UTC - Matteo Merli: I guess that when a field has `default=xyz` we could automatically treat it as `required=True`, because the value will always be there?
----
2019-03-03 17:30:42 UTC - Yuvaraj Loganathan: Now receiving this error
```Traceback (most recent call last):
  File "/home/uva/PycharmProjects/learn/AvroConsumer.py", line 16, in &lt;module&gt;
    msg.value()
  File "/home/uva/yyyy/sources/xxxxx/learn/lib/python3.5/site-packages/pulsar/__init__.py", line 156, in value
    return self._schema.decode(self._message.data())
  File "/home/uva/beam/sources/aws-sc-worker/learn/lib/python3.5/site-packages/pulsar/schema/schema.py", line 103, in decode
    d = fastavro.schemaless_reader(buffer, self._schema)
  File "fastavro/_read.pyx", line 763, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 773, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 564, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 460, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 562, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 435, in fastavro._read.read_union
IndexError: list index out of range```
----
2019-03-03 17:32:09 UTC - Yuvaraj Loganathan: Yes!
----
2019-03-03 17:32:38 UTC - Yuvaraj Loganathan: Once I am able to match the broker schema and `User.schema()` Incompatible Schema error is gone. But the above error appears.
----
2019-03-03 17:33:08 UTC - Matteo Merli: That I haven’t seen yet
----
2019-03-03 17:33:29 UTC - Matteo Merli: I’ll try with the above schema definitions later
----
2019-03-03 17:38:52 UTC - Guy Feldman: @Yuvaraj Loganathan i don't know if this is related, but are you using the avro maven plugin
----
2019-03-03 17:39:01 UTC - Yuvaraj Loganathan: @Guy Feldman Yes
----
2019-03-03 17:39:14 UTC - Guy Feldman: Did you define a string type configuration
----
2019-03-03 17:40:08 UTC - Guy Feldman: nm it shouldn't matter
----
2019-03-03 17:40:15 UTC - Yuvaraj Loganathan: No.
----
2019-03-03 17:40:29 UTC - Yuvaraj Loganathan: I have generated the POJO using avro maven plugin
----
2019-03-03 17:40:34 UTC - Guy Feldman: by default the maven plugin uses character arrays
----
2019-03-03 17:40:45 UTC - Guy Feldman: you have to tell it to use a string object
----
2019-03-03 17:40:48 UTC - Yuvaraj Loganathan: Ah..
----
2019-03-03 17:41:09 UTC - Yuvaraj Loganathan: Also My Schema is ```{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "favorite_color",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int",
      "default": 18
    }
  ]
}```
----
2019-03-03 17:41:11 UTC - Guy Feldman: but that's serde on the java side. I don't think it should matter
----
2019-03-03 17:41:26 UTC - Matteo Merli: can you also share the generated Java POJO ?
----
2019-03-03 17:43:09 UTC - Guy Feldman: gonna try a python consumer with my topics which use avro
----
2019-03-03 17:48:48 UTC - Yuvaraj Loganathan: @Matteo Merli <https://gist.github.com/skyrocknroll/dcf73dfb1940695d1ad742f069ce9a9f>
+1 : Matteo Merli
----
2019-03-03 17:56:24 UTC - Sijie Guo: there is a problem regarding current avro POJO
----
2019-03-03 17:59:25 UTC - Sijie Guo: in Java Avro POJO schema, it always convert the fields into Nullable. <https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java#L103>
----
2019-03-03 18:00:23 UTC - Matteo Merli: Oh, I thought that was Avro default setting
----
2019-03-03 18:00:28 UTC - Sijie Guo: If you are using a schema where fields doesn’t use union Nullable, you will have incompatible problems.
----
2019-03-03 18:00:29 UTC - Sijie Guo: I think AllowNull was added by mistake
----
2019-03-03 18:00:32 UTC - Matteo Merli: that’s why I did the same behavior in Python
----
2019-03-03 18:02:05 UTC - Sijie Guo: that’s not correct. it is actually causing a lot of confusions. @CongBo or @Penghui Li found the problem and they were saying they will send out a fix for it.
----
2019-03-03 18:02:26 UTC - Sijie Guo: We should just let Avro handles the schema generation
----
2019-03-03 18:02:35 UTC - Sijie Guo: we don’t need to add AllowNull
----
2019-03-03 18:02:36 UTC - Matteo Merli: uhm, the fix will likely be incompatible
----
2019-03-03 18:02:50 UTC - Sijie Guo: yes
----
2019-03-03 18:03:05 UTC - Sijie Guo: but its already in a very weird state
----
2019-03-03 18:03:06 UTC - Sijie Guo: :disappointed:
----
2019-03-03 18:03:54 UTC - Yuvaraj Loganathan: The same behaviour I am seeing in my schema file and broker registered schema has extra null.
----
2019-03-03 18:04:14 UTC - Sijie Guo: yes
----
2019-03-03 18:04:42 UTC - Sijie Guo: the extra null was introduced by `AllowNull`. `AllowNull` converts the schema type into a union type.
----
2019-03-03 18:04:51 UTC - Sijie Guo: s/schema type/field type
----
2019-03-03 18:05:59 UTC - Sijie Guo: basically currently there is no way in pulsar’s pojo avro specify non-null fields.
----
2019-03-03 18:06:15 UTC - Sijie Guo: a safeway to use AvroSchema is to use GenericSchema
----
2019-03-03 18:06:19 UTC - Yuvaraj Loganathan: Also I do see `default: null` in broker registered schema in all the string fields
----
2019-03-03 18:06:43 UTC - Sijie Guo: where you can use Avro to build the schema to get around the pojo schema issue
----
2019-03-03 18:07:27 UTC - Yuvaraj Loganathan: Ok Thanks @Sijie Guo
----
2019-03-03 18:07:48 UTC - Yuvaraj Loganathan: <https://pastebin.com/NyNcbse0> here is the broker schema for this avro schema <https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1551634869148400>
----
2019-03-03 18:08:05 UTC - Sijie Guo: @Matteo Merli - for BC, we can rename the AVRO one to AllowNullAvro
----
2019-03-03 18:08:28 UTC - Sijie Guo: and just introduce a new AVRO schema which doesn’t have AllowNull
----
2019-03-03 18:08:31 UTC - Sijie Guo: sorry
----
2019-03-03 18:08:34 UTC - Sijie Guo: keep the current one
----
2019-03-03 18:08:36 UTC - Matteo Merli: but, in terms of avro auto-generated code, we should extract the same schema as well. So how does one define a “nullable” field in the pojo
----
2019-03-03 18:08:39 UTC - Sijie Guo: just introduce a new one
----
2019-03-03 18:09:01 UTC - Matteo Merli: yes, though having 2 is not ideal either :confused:
+1 : Yuvaraj Loganathan
----
2019-03-03 18:09:27 UTC - Sijie Guo: &gt; So how does one define a “nullable” field in the pojo

@Matteo Merli: current it is always ‘nullable’ because AllowNull
----
2019-03-03 18:09:51 UTC - Matteo Merli: maybe an internal versioning of our Avro handling, to allow migrating to new type
----
2019-03-03 18:10:05 UTC - Sijie Guo: AllowNull is just a nightmare. where @Penghui Li and @CongBo spent quite a lot of time on troubleshooting the issue :slightly_smiling_face:
----
2019-03-03 18:11:08 UTC - Matteo Merli: (yes, and that combined with no specific errors from avro..)
----
2019-03-03 18:13:12 UTC - Sijie Guo: @Matteo Merli: I think it might be worth checking with @Jerry Peng if there are other considerations regarding AllowNull. (based what I learned it is not needed)
 <https://github.com/apache/pulsar/pull/1917>
----
2019-03-03 18:15:46 UTC - Matteo Merli: Yes
----
2019-03-03 18:16:35 UTC - Matteo Merli: surely there will be some reason. Let’s check out the options there
----
2019-03-03 19:43:18 UTC - Guy Feldman: I had trouble serializing a dictionary into this type using the standard avro libary
----
2019-03-03 19:44:19 UTC - Guy Feldman: default value for age doesn't work
----
2019-03-03 19:44:33 UTC - Guy Feldman: when I specified an age it worked fine
----
2019-03-03 19:49:53 UTC - Guy Feldman: I think a more general solution is to have the user provide a class with the schema method implemented.
----
2019-03-03 19:50:44 UTC - Guy Feldman: This way they can parse avro schema with the python avro library
----
2019-03-03 19:50:51 UTC - Guy Feldman: or provide their own
----
2019-03-03 19:51:15 UTC - Matteo Merli: though that won’t guarantee that the data complies with such schema
----
2019-03-03 19:51:42 UTC - Guy Feldman: 
----
2019-03-03 19:52:18 UTC - Matteo Merli: :slightly_smiling_face:
----
2019-03-03 19:52:39 UTC - Guy Feldman: alternatively, we can use the python avro library for reading the schemas
----
2019-03-03 19:53:14 UTC - Matteo Merli: reading the json?
----
2019-03-03 19:53:22 UTC - Guy Feldman: that's what i'm doing because my schemas also include avro imports
----
2019-03-03 19:53:48 UTC - Guy Feldman: yeah
----
2019-03-03 19:53:50 UTC - Guy Feldman: `from avro.schema import SchemaFromJSONData as make_avsc_object`
----
2019-03-03 19:54:16 UTC - Guy Feldman: then i just use the to_json method to get the json schema
----
2019-03-04 02:09:30 UTC - Byron: Hi folks. I am observing that the Reader.HasNext method in the Go client can return true even though there isn’t another message available. It may be my misunderstanding of the semantics of the method.
----
2019-03-04 02:13:06 UTC - Byron: My assumption is that this is a point-in-time lookup of whether the message is the latest and returns false if no more messages have been received and committed by the server.
----
2019-03-04 08:07:36 UTC - Sijie Guo: +1  we should probably allow user passing in an avro schema :slightly_smiling_face:
----
2019-03-04 08:09:00 UTC - Sijie Guo: @Matteo Merli @Yuvaraj Loganathan: I created a github issue for the AllowNull problem (<https://github.com/apache/pulsar/issues/3741>) @CongBo @Penghui Li are working on the fix
----
2019-03-04 08:09:41 UTC - Yuvaraj Loganathan: Awesome! Thank You!
----