You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shahid Chohan <ch...@stripe.com> on 2019/10/07 22:42:34 UTC

Kafka producer throws exceptions when publishing a keyed stream

Hello,

I am running into issues writing a keyed stream from sink subtasks to an
output kafka topic.

The job is of the form: source -> filter -> keyby(id) -> flatmap -> sink

The exceptions are coming from the kafka producer and cause checkpointing
to timeout:

   - FlinkKafkaException: Failed to send data to Kafka: Failed to allocate
   memory within the configured max blocking time
   - FlinkKafkaException: Failed to send data to Kafka: Expiring 16
   record(s) for mytopic-11:120000 ms has passed since batch creation

The job gets into a crashloop with the above exceptions and
occasionally briefly recovers temporarily before crashlooping again. I
believe the problem here is that I'm using the keys to determine the output
partitions, which causes the P sink subtasks to each fan out writes to N
output partitions. Ideally, each subtask would only write to a single
partition.

The job has the following constraints/properties:
#1: once a key has been written to an output kafka topic partition, it
needs to always be written to the same kafka partition in the future
#2: the sink subtask parallelism will initially equal the number of output
partitions
#3: I should be able to increase the parallelism in the future without
violating #1
#4: the output kafka topic will never add new partitions

If parallelism == partitions, then I believe the FlinkFixedPartitioner
would be a fine solution. However, I don't think that it would respect the
original key->partition mapping if I later increased parallelism since it
chooses the output partition using this scheme
<https://github.com/apache/flink/blob/76763da007cec27ba43396fc2df28e0d6aaf4c37/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java#L75>
.

Is there a technique I could use here to satisfy these constraints?
Possibly a tweak to the kafka producer's settings, another method for
partitioning the keyed stream, or something else?

Thanks.