You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2016/01/20 11:49:40 UTC

[jira] [Created] (FLINK-3264) Add load shedding policy into Kafka Consumers

Robert Metzger created FLINK-3264:
-------------------------------------

             Summary: Add load shedding policy into Kafka Consumers
                 Key: FLINK-3264
                 URL: https://issues.apache.org/jira/browse/FLINK-3264
             Project: Flink
          Issue Type: Improvement
          Components: Kafka Connector
            Reporter: Robert Metzger


There are situations when Flink's Kafka Consumer is not able to consume everything produced into a topic, for example when one Flink instance is subscribed to a busy Kafka topic (See user request: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Frequent-exceptions-killing-streaming-job-td4323.html )

I think we should allow users to control the behavior of the Kafka consumer in those situations.

I had an offline discussion with [~StephanEwen] about this and we think that the allowing users to pass a LoadSheddingPolicy to the KafkaConsumer would be the best solution.
In the policy, users can define a frequency for the consumer to request the latest offsets in the subscribed partitions (the requests can either be based on time (every n ms) or on record count (every n'th record). Then, the policy can decide to skip a certain amount of offsets (maybe even set to the latest offset).
With the "offset skipping" approach, we'll avoid fetching records we can not process anyways.

In the 0.9 consumer, there doesn't seem to be an API for requesting the latest offset of a topicPartition. I'll ask on the Kafka ML whats the status there.
With {{seek()}} we can fetch from any offset.

In the 0.8 SimpleConsumer, there is a method for requesting the offsets:
{code}
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
			OffsetResponse response = consumer.getOffsetsBefore(request);
{code}
The fetch offset is controlled within the {{LegacyFetcher}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)