You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/02/21 09:11:03 UTC

Slack digest for #general - 2020-02-21

2020-02-20 09:46:20 UTC - Yuvaraj Loganathan: For Jwt You need add header
```Authorization: Bearer xxx```
----
2020-02-20 09:57:11 UTC - Steven Op de beeck: @Steven Op de beeck has joined the channel
----
2020-02-20 10:21:06 UTC - Steven Op de beeck: Hi all, I'm new to Pulsar. I'm using the pulsar-io-debezium-postgres connector. I'd like to translate source data (value) from the debezium connector into plain JSON, avoiding the overhead of AVRO schema data. What's the best way to approach this? /cc @jia zhai
a. Define a Pulsar function ?
b. Modify the Pulsar Debezium Connector?
c. Some configuration I'm unaware of?
----
2020-02-20 10:23:24 UTC - jia zhai: @Steven Le Roux debezium should already support Json type schema, You could change the config yaml file.
----
2020-02-20 10:23:58 UTC - Steven Op de beeck: This is the file I'm using now:
```tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-producer-topic"
archive: "connectors/pulsar-io-debezium-postgres-2.5.0.nar"
parallelism: 1

configs:
  database.hostname: "producerdb"
  database.port: "5432"
  database.user: "debezium"
  database.password: "dbz"
  database.dbname: "pocdb"
  database.server.name: "producerdb"
  key.converter.schemas.enable: false
  key.converter: "org.apache.kafka.connect.json.JsonConverter"
  value.converter.schemas.enable: false
  value.converter: "org.apache.kafka.connect.json.JsonConverter"
  table.whitelist: "public.employee_update_event"
  pulsar.service.url: "<pulsar://pulsar-es:6650>"```
----
2020-02-20 10:24:23 UTC - jia zhai: right, the converter
----
2020-02-20 10:25:03 UTC - jia zhai: And after stored in pulsar,  we should use pular’s keyvalue schema type
----
2020-02-20 10:25:47 UTC - Steven Op de beeck: It still outputs a giant json with schemas inside.
----
2020-02-20 10:25:54 UTC - jia zhai: @tuteng I recall there is an example code for how to read data out from pulsar?
----
2020-02-20 10:25:56 UTC - Steven Op de beeck: ```{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"producerdb.public.employee_update_event.Key"},"payload":{"id":388}�{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"int64","optional":true,"field":"aggregate_id"},{"type":"string","optional":true,"field":"aggregate_type"},{"type":"string","optional":true,"field":"content"},{"type":"string","optional":true,"field":"event_type"},{"type":"int64","optional":false,"field":"timestamp"}],"optional":true,"name":"producerdb.public.employee_update_event.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"int64","optional":true,"field":"aggregate_id"},{"type":"string","optional":true,"field":"aggregate_type"},{"type":"string","optional":true,"field":"content"},{"type":"string","optional":true,"field":"event_type"},{"type":"int64","optional":false,"field":"timestamp"}],"optional":true,"name":"producerdb.public.employee_update_event.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"producerdb.public.employee_update_event.Envelope"},"payload":{"before":{"id":388,"aggregate_id":null,"aggregate_type":null,"content":null,"event_type":null,"timestamp":0},"after":null,"source":{"version":"0.10.0.Final","connector":"postgresql","name":"producerdb","ts_ms":1582194291258,"snapshot":"false","db":"pocdb","schema":"public","table":"employee_update_event","txId":885,"lsn":24883520,"xmin":null},"op":"d","ts_ms":1582194291263}}```
----
2020-02-20 10:27:28 UTC - Steven Op de beeck: You're suggesting that I shouldn't bother with what the json looks like on Pulsar, and use Pulsar key value to get the values out. And ignore what is actually stored on Pulsar?
----
2020-02-20 10:28:26 UTC - Steven Op de beeck: My current mindset is that we won't need the AVRO schema, and would like to get rid of the overhead it generates.
----
2020-02-20 10:31:06 UTC - jia zhai: right, Pulsar KeyValue Schema should be able to ser/des the messages in pulsar
----
2020-02-20 10:34:09 UTC - Steven Op de beeck: ```value.converter.schemas.enable: false```
This isn't implemented then in the debezium-pulsar connector? Or it doesn't mean what I think it means.
----
2020-02-20 10:41:05 UTC - Steven Op de beeck: This is how we're currently consuming Pulsar data:
```private void received(Consumer&lt;byte[]&gt; consumer, Message&lt;byte[]&gt; message) {
        LocalDateTime messageReceivedTimestamp = now();
        String messageText = new String(message.getData());

        <http://log.info|log.info>("Message ({}) received @ {}, value: {}", message.getMessageId(), messageReceivedTimestamp, messageText);
        writeToPerformanceLog(messageReceivedTimestamp, messageText);
        try {
            consumer.acknowledge(message);
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }```
----
2020-02-20 10:41:32 UTC - Steven Op de beeck: I can do getValue, sure; but I'm just wondering if I can get rid of the AVRO overhead.
----
2020-02-20 10:43:14 UTC - tuteng: Sorry, <https://github.com/apache/pulsar/pull/6034> There are some comments on this PR that I haven't had time to solve. I will fix these problems as soon as possible.
----
2020-02-20 10:50:45 UTC - Steven Op de beeck: Or am I totally wrong about this all, and the above output example is not AVRO at all, just pure Pulsar? :fearful: And I'm worried about nothing.
----
2020-02-20 10:56:36 UTC - Steven Op de beeck: In that assumption, the question is: how do we consume what the debezium-connector produces. What's the consumer schema?
```            pulsarClient
                    .newConsumer(???)
                    .topic(topic)
                     .messageListener(this::received)
                    .subscribe();```
----
2020-02-20 12:25:23 UTC - Steven Op de beeck: We've verified that the KeyValue payload on Pulsar is Debezium-Connector specific, not a Pulsar technicality. Puzzled how to decode it on the consumer end.
----
2020-02-20 12:46:07 UTC - Miroslav Prymek: Could anyone please show me a valid protobuf schema in JSON format suitable for `pulsar-admin schemas upload` ?

I'm trying things like
```{
    "properties": {},
    "schema": "syntax = \"proto2\";\n\npackage proto;\n\noption java_package = \"fi.hsl.common.mqtt.proto\";\noption java_outer_classname = \"Mqtt\";\n\nmessage RawMessage {\n    required int32 SchemaVersion = 1 [default = 1];\n    optional string topic = 2;\n    optional bytes payload = 3;\n}\n",
    "type": "PROTOBUF"
}```
but without luck.

The point is, I want to consume protobuf-serialized messages from <https://github.com/HSLdevcom/mqtt-pulsar-gateway> in Java pulsar function. I can decode protobuf by hand but this is not very clean solution...
----
2020-02-20 12:47:08 UTC - Miroslav Prymek: BTW, it's very unfortunate that `schemas upload`  returns just
```HTTP 500 Internal Server Error

Reason: HTTP 500 Internal Server Error```
when the schema file is not valid :disappointed:
----
2020-02-20 13:20:51 UTC - rani: @rani has joined the channel
----
2020-02-20 13:43:21 UTC - Steve Kim: I apologize if this question has already been asked. (I searched in Slack history and did not find any relevant previous threads.) Is there a plan to start publishing the Python client as a Conda package (<https://docs.conda.io/projects/conda/en/latest/user-guide/concepts/packages.html>) that is distributed through a conda channel (e.g. <https://conda-forge.org>) in addition to the wheel files at <https://pypi.org/project/pulsar-client/> ?
----
2020-02-20 14:12:06 UTC - Miroslav Prymek: Ok, I understand it now - the `schema`  property should be Avro JSON even when its serialized using protobuf.
----
2020-02-20 15:15:27 UTC - tcourdy: ah so the functions are stored in the bookkeeper replicas not the broker replicas?
----
2020-02-20 15:15:50 UTC - tcourdy: so then I should set `numFunctionPackageReplicas` to the number of bookkeeper replicas that I have?
----
2020-02-20 15:44:16 UTC - Devin G. Bost: If that hasn't been done yet, I think that's a great idea. Could you please create a feature request in the Pulsar Github Issues?
----
2020-02-20 16:06:27 UTC - id3a: @id3a has joined the channel
----
2020-02-20 16:09:30 UTC - sathish: @sathish has joined the channel
----
2020-02-20 16:19:37 UTC - Rolf Arne Corneliussen: Thanks for your input, @Antti Kaikkonen. I have tried to read up on Pulsar, a complex system, but it seems very different to Kafka. For example, in Kafka the partitions of a topic is distributed among the members of a consumer group, and the consumer gets callbacks when a partition is assigned/unassigned to it. I cannot see anything similar with Pulsar. Even if you use a Key_Shared subscription mode, the subscriber has no idea of the range of keys is handles (unless you use a stick hash range, but that need coordination to be fault tolerant). A min-heap on top of table service with a pluggable 'compare' function would be nice :-)
----
2020-02-20 16:31:56 UTC - Devin G. Bost: In my experience, JSON is usually more expensive to convert to/from than binary types like Avro. (Sometimes a LOT more expensive.)
----
2020-02-20 16:32:39 UTC - Devin G. Bost: Also, the schema features of Pulsar are not dependent on Avro.
----
2020-02-20 16:33:14 UTC - Devin G. Bost: What do you mean specifically by "overhead"?
----
2020-02-20 17:58:42 UTC - Mikhail Veygman: Hi.
----
2020-02-20 17:59:47 UTC - Mikhail Veygman: When sending async from the producer it creates a Future&lt;MessageId&gt; object which then completes (I am assuming through a threadpool in the back ground).  Is the number of threads in that pool configurable?
----
2020-02-20 18:01:18 UTC - Matteo Merli: Yes: <https://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ClientBuilder.html#ioThreads-int->
----
2020-02-20 18:05:31 UTC - Mikhail Veygman: Oh!.  Ok.  The sort of documented feature.
----
2020-02-20 21:20:48 UTC - Antti Kaikkonen: I think that with exclusive/failover mode a partition is assigned to a single consumer but the problem is that if the consumer dies then it will be reassigned and there isn't a callback mechanism that you described.

Concurrent updates might be problematic with a heap.
----
2020-02-20 23:13:13 UTC - Tamer: Is there any recommendation on how to automate Pulsar function updates from a CI/CD pipeline?

Should I just embed the pulsar-admin cli in my build process?
----
2020-02-21 00:10:14 UTC - Ali Ahmed: @Abhilash Mandaliya
<https://github.com/aahmed-se/pulsar-io-gradle>
----
2020-02-21 00:23:47 UTC - Devin G. Bost: We ran into that issue before. The admin-cli was really slow for our purposes. It’s better to hit the REST endpoint directly.
----
2020-02-21 00:23:58 UTC - Devin G. Bost: (Admin REST endpoint.)
----
2020-02-21 01:52:26 UTC - Ken Huang: hi @David Kjerrumgaard, I use your Dockerfile to run a new pulsar
```docker run -it \
 -p 6650:6650 \
 -p 8080:8080 \
 -p 8081:8081 \
 -p 8443:8443 \
 -p 6651:6651 \
 -p 8888:8888 \
 pulsar-in-action:latest```
It works fine, but when I want to grant-permission, I'll get "Authorization is not enabled   Reason: HTTP 501 Not Implemented" error.

What can I do for fixing it? Thank you very much.
----
2020-02-21 01:53:21 UTC - Ken Huang: this is the command for grant-permission
```./bin/pulsar-admin namespaces grant-permission public/default \
      --role webapp \
      --actions produce,consume```
----
2020-02-21 04:38:51 UTC - Tamer: Thanks @Devin G. Bost, that sound like a good option. I can either use PulsarAdmin API or just hit the REST http with curl

How did you pass the jar file path?
Do you copy it to worker host (e.g scp)? Or use an http URL of the jar?
----
2020-02-21 04:40:57 UTC - Devin G. Bost: I recommend the REST endpoint directly.
Our first automation path was the CLI. That didn’t last long.
Our second automation path was the PulsarAdmin API. That lasted longer, but it still was a source of frustration.
Our third automation path was to write a Go consumer that listened to a Pulsar topic, got our desired data, and hit the REST endpoint directly. That worked amazingly well and was much faster than all of the others. It’s still running in production.
----
2020-02-21 04:41:51 UTC - Devin G. Bost: All you need is a URL where Pulsar can download the Jar, so you can use any cloud storage for that.
----
2020-02-21 04:43:16 UTC - Devin G. Bost: I’ve been working on getting permission for our team to open source our deployment tool. That effort is currently in progress.
----
2020-02-21 05:31:57 UTC - Tamer: Thank you so much for the details. I will use the REST API directly. Maybe later I will consider writing a small wrapper in rust to have a fast cli.

When you update the function do you just make the `update` api call? 

I notices many times when I manually update the function it occasionally fails with 503 error and I need to delete and recreate the function again to fix it.

Have you ever had this issue that update call fails? Do you stop the function first?
----
2020-02-21 05:33:34 UTC - Devin G. Bost: Hmm the 500 errors sound familiar. What version are you running?
----
2020-02-21 05:35:24 UTC - Devin G. Bost: It’s possible that there’s an unresolved bug with the update process.
It would be extremely helpful if you could please capture log details when that happens again and create a Github Issue so the bug report can go through triage and get looked at in detail. (Basically, the way the Pulsar community does things is that the issue must be reported as a Github Issue before it can be resolved.)
----
2020-02-21 05:35:38 UTC - Devin G. Bost: The more detail you can provide the better.
----
2020-02-21 05:36:29 UTC - Devin G. Bost: Yes, I believe we make the `update`  API call. The documentation wasn’t great, so if you need help, I can take a look at how we’ve done it.
----
2020-02-21 05:44:11 UTC - Tamer: I was running 2.4.2 but now upgraded to 2.5.0, will test again once I have the CI/CD working
----
2020-02-21 05:44:23 UTC - Devin G. Bost: Sounds good. Thanks.
----
2020-02-21 05:44:30 UTC - Devin G. Bost: Please keep us posted.
----
2020-02-21 05:47:02 UTC - Tamer: Thanks for the detailed answers.

I am actually helping 3 clients at the moment to migrate to Pulsar and all of them needed this CI/CD for the pulsar function. 2 clients using Jenkins and the other one gitlab runner.

I will definitely share my updates and will write a blog post at the end.
----
2020-02-21 05:47:29 UTC - Devin G. Bost: Sounds great!
----
2020-02-21 06:38:45 UTC - Pushkar Sawant: @Pushkar Sawant has joined the channel
----
2020-02-21 06:45:56 UTC - Pushkar Sawant: Hi,
Has anyone experienced an issue when an entire Write quorum, has gone down? What’s the best course of action to recover from such failure?
----
2020-02-21 07:36:40 UTC - tuteng: I think you can try to consume it using KeyValue schema. `
```byte[] encodeBytes = Schema.KV_BYTES().encode(new KeyValue&lt;&gt;(fooBytes, barBytes));
KeyValue&lt;byte[], byte[]&gt; decodeKV = Schema.KV_BYTES().decode(encodeBytes);```
`
----
2020-02-21 07:38:50 UTC - tuteng: <https://github.com/apache/pulsar/blob/master/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java#L364> , then convert bytes to json
----
2020-02-21 08:15:12 UTC - Steven Op de beeck: @Devin G. Bost Thanks for your response. What I mean with overhead is this: <https://gist.github.com/stevenodb/efa02194d911b185c2e878f0218de736>
While what we are interested in is this small part:
```{
      "id": 27,
      "aggregate_id": 27,
      "aggregate_type": "Employee",
      "content": "{\"name\":\"robin\"}",
      "event_type": "EmployeeUpdated",
      "timestamp": 58256225452041
 }```

----
2020-02-21 08:52:44 UTC - Steven Op de beeck: @tuteng Thanks, I will give that a try.
----