You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/04/01 06:00:00 UTC

[GitHub] [pulsar] casidiablo opened a new pull request #3955: [Flink] Allow to customize PulsarProducer

casidiablo opened a new pull request #3955: [Flink] Allow to customize PulsarProducer
URL: https://github.com/apache/pulsar/pull/3955
 
 
   This is an improvement over #3894.
   
   Because of how Flink instantiates functions, instead of passing a
   custom `PulsarProducer` client we need to pass an object that is
   serializable. The current implementation will default to always
   call `createProducer()` because `producer` is `transient`, so it will
   always be null when Flink creates new instances of the sink.
   
   More context of why I need this:
   
   I'm reading from a Kafka stream and writing into a Pulsar cluster.
   The amount of data produced by the source is stream is such that
   usually my jobs fail with: `Producer send queue is full`.
   
   I was able to solve this by enabling `blockIfQueueFull`:
   
   ```kotlin
     return FlinkPulsarProducer<ByteArray>(
         pulsarServerUrl,
         outputTopic,
         serializationSchema,
         PulsarKeyExtractor { UUID.randomUUID().toString() },
         mapOf("blockIfQueueFull" to true)) // <- this is the new, optional constructor param
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services