You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tomáš Lach <la...@gmail.com> on 2018/01/26 08:34:14 UTC

Kafka-streams API skipping offsets

Hello, i have following problem with kafka-streams scala app and Exactly
once delivery quarantee:



Topic filled with kafka-streams app(exactly once enabled) has wrong ending
offset. Broker and streams API version 0.11. When i run
*kafka.tools.GetOffsetShell*, it gives ending offset 17, but in topic there
are just 12 messages (retention is disabled). When exactly once guarantee
is disabled, these offsets are matching. I tried to reset kafka-streams
according to this
<https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>,
but problem still remains.



When i run *SimpleConsumerShell* with *--print-offsets* option, output is
folowing:



next offset = 1

{"timestamp": 149583551238149, "data": {...}}

next offset = 2

{"timestamp": 149583551238149, "data": {...}}

next offset = 4

{"timestamp": 149583551238149, "data": {...}}

next offset = 5

{"timestamp": 149583551238149, "data": {...}}

next offset = 7

{"timestamp": 149583551238149, "data": {...}}

next offset = 8

{"timestamp": 149583551238149, "data": {...}}

...



Some offsets are apparently skipped when exactly-once dellivery guarantee
is enabled. I’m reading from this topic with spark, and it is causing these
errors: “*assertion failed*: *Ran out of messages before reaching ending
offset 17 for topic offset-test partition 0 start 0. This should not
happen, and indicates that messages may have been lost” *or* “**Got wrong
record for spark-executor-<group.id <http://group.id>> <topic> 0 even after
seeking to offset” – *depend on version of spark-streaming-kafka lib. We're
running Cloudera distribution of Kafka.



My kafka streams app config is following:



        application.id = MultipleTopicsAppTest

        application.server =

        bootstrap.servers = [host:9092]

        buffered.records.per.partition = 1000

        cache.max.bytes.buffering = 10485760

        client.id =

        commit.interval.ms = 100

        connections.max.idle.ms = 540000

        default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler

        default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde

        default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp

        default.value.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde

        key.serde = null

        metadata.max.age.ms = 300000

        metric.reporters = []

        metrics.num.samples = 2

        metrics.recording.level = INFO

        metrics.sample.window.ms = 30000

        num.standby.replicas = 0

        num.stream.threads = 1

        partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper

        poll.ms = 100

        processing.guarantee = exactly_once

        receive.buffer.bytes = 32768

        reconnect.backoff.max.ms = 1000

        reconnect.backoff.ms = 50

        replication.factor = 1

        request.timeout.ms = 40000

        retry.backoff.ms = 100

        rocksdb.config.setter = null

        security.protocol = PLAINTEXT

        send.buffer.bytes = 131072

        state.cleanup.delay.ms = 600000

        state.dir = /tmp/kafka-streams

        timestamp.extractor = null

        value.serde = null

        windowstore.changelog.additional.retention.ms = 86400000

        zookeeper.connect =



What can cause this problem? Thanks!

Re: Kafka-streams API skipping offsets

Posted by "Matthias J. Sax" <ma...@confluent.io>.
That sound like expected. Note, if you use exactly-once, on commit/abort
of a transaction, the broker write a commit/abort message into the
partitions. Those markers fill up one message/record slot.

Thus, you have "gaps" in the message offsets and in you case, I assume
that you have 5 commit markers in the corresponding partition.


-Matthias

On 1/26/18 12:34 AM, Tomáš Lach wrote:
> Hello, i have following problem with kafka-streams scala app and Exactly
> once delivery quarantee:
> 
> 
> 
> Topic filled with kafka-streams app(exactly once enabled) has wrong ending
> offset. Broker and streams API version 0.11. When i run
> *kafka.tools.GetOffsetShell*, it gives ending offset 17, but in topic there
> are just 12 messages (retention is disabled). When exactly once guarantee
> is disabled, these offsets are matching. I tried to reset kafka-streams
> according to this
> <https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>,
> but problem still remains.
> 
> 
> 
> When i run *SimpleConsumerShell* with *--print-offsets* option, output is
> folowing:
> 
> 
> 
> next offset = 1
> 
> {"timestamp": 149583551238149, "data": {...}}
> 
> next offset = 2
> 
> {"timestamp": 149583551238149, "data": {...}}
> 
> next offset = 4
> 
> {"timestamp": 149583551238149, "data": {...}}
> 
> next offset = 5
> 
> {"timestamp": 149583551238149, "data": {...}}
> 
> next offset = 7
> 
> {"timestamp": 149583551238149, "data": {...}}
> 
> next offset = 8
> 
> {"timestamp": 149583551238149, "data": {...}}
> 
> ...
> 
> 
> 
> Some offsets are apparently skipped when exactly-once dellivery guarantee
> is enabled. I’m reading from this topic with spark, and it is causing these
> errors: “*assertion failed*: *Ran out of messages before reaching ending
> offset 17 for topic offset-test partition 0 start 0. This should not
> happen, and indicates that messages may have been lost” *or* “**Got wrong
> record for spark-executor-<group.id <http://group.id>> <topic> 0 even after
> seeking to offset” – *depend on version of spark-streaming-kafka lib. We're
> running Cloudera distribution of Kafka.
> 
> 
> 
> My kafka streams app config is following:
> 
> 
> 
>         application.id = MultipleTopicsAppTest
> 
>         application.server =
> 
>         bootstrap.servers = [host:9092]
> 
>         buffered.records.per.partition = 1000
> 
>         cache.max.bytes.buffering = 10485760
> 
>         client.id =
> 
>         commit.interval.ms = 100
> 
>         connections.max.idle.ms = 540000
> 
>         default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> 
>         default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
> 
>         default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> 
>         default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
> 
>         key.serde = null
> 
>         metadata.max.age.ms = 300000
> 
>         metric.reporters = []
> 
>         metrics.num.samples = 2
> 
>         metrics.recording.level = INFO
> 
>         metrics.sample.window.ms = 30000
> 
>         num.standby.replicas = 0
> 
>         num.stream.threads = 1
> 
>         partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> 
>         poll.ms = 100
> 
>         processing.guarantee = exactly_once
> 
>         receive.buffer.bytes = 32768
> 
>         reconnect.backoff.max.ms = 1000
> 
>         reconnect.backoff.ms = 50
> 
>         replication.factor = 1
> 
>         request.timeout.ms = 40000
> 
>         retry.backoff.ms = 100
> 
>         rocksdb.config.setter = null
> 
>         security.protocol = PLAINTEXT
> 
>         send.buffer.bytes = 131072
> 
>         state.cleanup.delay.ms = 600000
> 
>         state.dir = /tmp/kafka-streams
> 
>         timestamp.extractor = null
> 
>         value.serde = null
> 
>         windowstore.changelog.additional.retention.ms = 86400000
> 
>         zookeeper.connect =
> 
> 
> 
> What can cause this problem? Thanks!
>