You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/04/01 06:00:00 UTC
[GitHub] [pulsar] casidiablo opened a new pull request #3955: [Flink] Allow
to customize PulsarProducer
casidiablo opened a new pull request #3955: [Flink] Allow to customize PulsarProducer
URL: https://github.com/apache/pulsar/pull/3955
This is an improvement over #3894.
Because of how Flink instantiates functions, instead of passing a
custom `PulsarProducer` client we need to pass an object that is
serializable. The current implementation will default to always
call `createProducer()` because `producer` is `transient`, so it will
always be null when Flink creates new instances of the sink.
More context of why I need this:
I'm reading from a Kafka stream and writing into a Pulsar cluster.
The amount of data produced by the source is stream is such that
usually my jobs fail with: `Producer send queue is full`.
I was able to solve this by enabling `blockIfQueueFull`:
```kotlin
return FlinkPulsarProducer<ByteArray>(
pulsarServerUrl,
outputTopic,
serializationSchema,
PulsarKeyExtractor { UUID.randomUUID().toString() },
mapOf("blockIfQueueFull" to true)) // <- this is the new, optional constructor param
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services