You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/05/11 09:11:03 UTC

Slack digest for #general - 2019-05-11

2019-05-10 11:04:32 UTC - bhagesharora: Hello everyone,
In message retention policy.I used the subcommand and specify a namespace public/default,and set a size limit and time flag.

bin/pulsar-admin namespaces set-retention public/default --size 5M --time 5m

later I publish the message and consume the message using python client API
After that I consumed all the messages using consumer.acknowledge_cumulative(msg), then I closed the consumer and client.
then again I started client and cosumer and trying to receive acknowledged messages using msg = consumer.receive() method.
But message is not coming.
----
2019-05-10 11:11:08 UTC - Kim Christian Gaarder: Here is one possible reason if you have used the same subscription name the first and second time you consume. If you did not unsubscribe, only closed the subscription, then the subscription lives on server side under the same name and will remember where you are in the stream. You need to unsubscribe before using the same subscription name again, or you need to use a different subscription name when you start the consumer expecting message from beginning.
----
2019-05-10 11:30:51 UTC - bhagesharora: I tried out with two way first, using consumer.unsubscribe() and another one is changing  the subscription name, But still Its not working.
----
2019-05-10 12:00:56 UTC - Brian Doran: `PulsarKafkaProducer` does not support partitionsFor, any particular reason? In our Kafka implementation we provide the ability to  have targeted specific partitions:
`producer.send(new ProducerRecord&lt;&gt;(topic, partitionIndex, null, data));`

This can't be done using `PulsarKafkaProducer`?
----
2019-05-10 15:43:26 UTC - Ke Li: @Ke Li has joined the channel
----
2019-05-10 16:17:32 UTC - Shivji Kumar Jha: I seem to have hit an issue with pulsar-flink. In <https://github.com/apache/pulsar/pull/4232> I added ClientConfigurationData but I just realised that doesn’t contain AuthenticationDataProvider.
If we set  Authentication instead,  pulsar-flink fails saying AuthenticationToken has a lambda that is not serializable. Is there a workaround to this?
----
2019-05-10 16:21:14 UTC - Shivji Kumar Jha: I also tried adding this to ClientConfigurationData
```     private AuthenticationDataProvider authenticationDataProvider;```

And then
```
        ClientConfigurationData clientConfig = new ClientConfigurationData();
        Supplier&lt;String&gt; tokenSupplier = () -&gt; PULSAR_AUTH_TOKEN;
        clientConfig.setAuthenticationDataProvider(new AuthenticationDataToken(tokenSupplier));
```
This too has same error when i try to add source flink env
```
 DataStream dataStream = env.addSource(src).uid("testStream");
```

Error
```
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.connectors.pulsar.partitioner.examples.SerializbleBug$$Lambda$1/1274370218
```
----
2019-05-10 16:21:35 UTC - Shivji Kumar Jha: @Matteo Merli ^^
----
2019-05-10 17:36:53 UTC - Nicolas Ha: Trying to use SQL. I can create a producer, but then I get a NPE:
```
19-05-10 17:38:15 nhas-MBP INFO [org.apache.pulsar.client.impl.ProducerImpl:889] - [<persistent://8d1e8886-318d-4821-a898-365653802e80/standalone/subscriptions/2956c5a1-8105-444c-bbaf-dd52528b9fe3>] [null] Creating producer on cnx [id: 0x1de19018, L:/127.0.0.1:55227 - R:localhost/127.0.0.1:6650]
19-05-10 17:38:15 nhas-MBP WARN [org.apache.pulsar.client.impl.ClientCnx:529] - [id: 0x1de19018, L:/127.0.0.1:55227 - R:localhost/127.0.0.1:6650] Received error from server: java.lang.NullPointerException
19-05-10 17:38:15 nhas-MBP ERROR [org.apache.pulsar.client.impl.ProducerImpl:970] - [<persistent://8d1e8886-318d-4821-a898-365653802e80/standalone/subscriptions/2956c5a1-8105-444c-bbaf-dd52528b9fe3>] [null] Failed to create producer: java.lang.NullPointerException```
What does this usually point to?
----
2019-05-10 17:56:42 UTC - Matteo Merli: @Kim Christian Gaarder on (1) the total ordering is guaranteed “per-partition”
----
2019-05-10 17:58:15 UTC - Matteo Merli: If you need strong ordering you need to either:
 * Use 1 single partition
 * Provide key on the messages so that messages with same keys go to same partition. With that you get “per-key” ordering
----
2019-05-10 18:41:56 UTC - Joe Francis: You have to explicitly rewind. By default a sub will start delivery at (1) on an existing sub, where it left off  or (2) on a new sub,  any message that gets published after subscribing.
----
2019-05-10 20:05:39 UTC - Sanjeev Kulkarni: Where are you seeing this exception? @Nicolas Ha 
----
2019-05-10 20:05:46 UTC - Sanjeev Kulkarni: On the sql worker?
----
2019-05-10 20:28:13 UTC - Nicolas Ha: in my own code console
----
2019-05-10 20:37:36 UTC - Sanjeev Kulkarni: can you open a bin/pulsar sql to get a presto shell and see if that works/
----
2019-05-10 20:44:17 UTC - Matteo Merli: @Shivji Kumar Jha Unfortunately I don’t think there’s an easy solution.

The authentication was kept out of the ClientConfigurationData precisely for the serialization problem. It’s not just the lambda, it’s that it’s plugin-specific too.

We could pass the auth config as “string” map, which can be then passed to the client builder. (`authPluginClassName` and `authParams`).

eg:
```
authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:XXXXXXXXXX
```
----
2019-05-10 21:00:57 UTC - Nicolas Ha: aha that was it, I was starting it wrong
----
2019-05-10 21:01:10 UTC - Nicolas Ha: Thanks!
----
2019-05-10 21:19:01 UTC - Nicolas Ha: For <https://pulsar.apache.org/docs/en/sql-getting-started/>
Instead of ```            producer.newMessage().value(foo).send();
```
Can I pass something else than `foo`? In my case I have the avro schema (which may change) but I don’t have a Java POJO to instanciate
----
2019-05-10 21:20:30 UTC - Matteo Merli: You mean you have bytes already, along with a schema def?
----
2019-05-10 21:22:34 UTC - Nicolas Ha: I suppose I could encode the avro data myself, yes
----
2019-05-10 21:23:07 UTC - Matteo Merli: There’s the option of AUTO_PRODUCE schema 
----
2019-05-10 21:23:20 UTC - Matteo Merli: Which will validate against the set schema
----
2019-05-10 21:23:36 UTC - Nicolas Ha: I’ll look it up, thanks :slightly_smiling_face:
----
2019-05-10 21:25:58 UTC - Nicolas Ha: `AUTO_PRODUCE_BYTES` right? Look like what I was after :smile:
----
2019-05-10 21:27:34 UTC - Matteo Merli: Yes
----
2019-05-10 21:28:04 UTC - Matteo Merli: Otherwise you should also be able to generate the POJO from the Avro schema with Maven
female_mage : Nicolas Ha
----
2019-05-10 21:29:19 UTC - Nicolas Ha: Sounds good either way. I guess this works because the avro schema is sent with the Avro-encoded message then?
----
2019-05-10 21:30:01 UTC - Nicolas Ha: like this it seems - <https://github.com/apache/pulsar/blob/9c0937b85da38d25d6b0dbbcc2a58b0178dbf09f/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java#L580-L586> I should have enough to play, Thanks!
----
2019-05-10 21:31:30 UTC - Matteo Merli: Yes, if you have the data already encoded (eg: reading bytes from a file) and you have the schema, then you wouldn’t need the pojo
slightly_smiling_face : Nicolas Ha
----