You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tomáš Lach <la...@gmail.com> on 2018/01/26 08:34:14 UTC
Kafka-streams API skipping offsets
Hello, i have following problem with kafka-streams scala app and Exactly
once delivery quarantee:
Topic filled with kafka-streams app(exactly once enabled) has wrong ending
offset. Broker and streams API version 0.11. When i run
*kafka.tools.GetOffsetShell*, it gives ending offset 17, but in topic there
are just 12 messages (retention is disabled). When exactly once guarantee
is disabled, these offsets are matching. I tried to reset kafka-streams
according to this
<https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>,
but problem still remains.
When i run *SimpleConsumerShell* with *--print-offsets* option, output is
folowing:
next offset = 1
{"timestamp": 149583551238149, "data": {...}}
next offset = 2
{"timestamp": 149583551238149, "data": {...}}
next offset = 4
{"timestamp": 149583551238149, "data": {...}}
next offset = 5
{"timestamp": 149583551238149, "data": {...}}
next offset = 7
{"timestamp": 149583551238149, "data": {...}}
next offset = 8
{"timestamp": 149583551238149, "data": {...}}
...
Some offsets are apparently skipped when exactly-once dellivery guarantee
is enabled. I’m reading from this topic with spark, and it is causing these
errors: “*assertion failed*: *Ran out of messages before reaching ending
offset 17 for topic offset-test partition 0 start 0. This should not
happen, and indicates that messages may have been lost” *or* “**Got wrong
record for spark-executor-<group.id <http://group.id>> <topic> 0 even after
seeking to offset” – *depend on version of spark-streaming-kafka lib. We're
running Cloudera distribution of Kafka.
My kafka streams app config is following:
application.id = MultipleTopicsAppTest
application.server =
bootstrap.servers = [host:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 100
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
key.serde = null
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
timestamp.extractor = null
value.serde = null
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =
What can cause this problem? Thanks!
Re: Kafka-streams API skipping offsets
Posted by "Matthias J. Sax" <ma...@confluent.io>.
That sound like expected. Note, if you use exactly-once, on commit/abort
of a transaction, the broker write a commit/abort message into the
partitions. Those markers fill up one message/record slot.
Thus, you have "gaps" in the message offsets and in you case, I assume
that you have 5 commit markers in the corresponding partition.
-Matthias
On 1/26/18 12:34 AM, Tomáš Lach wrote:
> Hello, i have following problem with kafka-streams scala app and Exactly
> once delivery quarantee:
>
>
>
> Topic filled with kafka-streams app(exactly once enabled) has wrong ending
> offset. Broker and streams API version 0.11. When i run
> *kafka.tools.GetOffsetShell*, it gives ending offset 17, but in topic there
> are just 12 messages (retention is disabled). When exactly once guarantee
> is disabled, these offsets are matching. I tried to reset kafka-streams
> according to this
> <https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>,
> but problem still remains.
>
>
>
> When i run *SimpleConsumerShell* with *--print-offsets* option, output is
> folowing:
>
>
>
> next offset = 1
>
> {"timestamp": 149583551238149, "data": {...}}
>
> next offset = 2
>
> {"timestamp": 149583551238149, "data": {...}}
>
> next offset = 4
>
> {"timestamp": 149583551238149, "data": {...}}
>
> next offset = 5
>
> {"timestamp": 149583551238149, "data": {...}}
>
> next offset = 7
>
> {"timestamp": 149583551238149, "data": {...}}
>
> next offset = 8
>
> {"timestamp": 149583551238149, "data": {...}}
>
> ...
>
>
>
> Some offsets are apparently skipped when exactly-once dellivery guarantee
> is enabled. I’m reading from this topic with spark, and it is causing these
> errors: “*assertion failed*: *Ran out of messages before reaching ending
> offset 17 for topic offset-test partition 0 start 0. This should not
> happen, and indicates that messages may have been lost” *or* “**Got wrong
> record for spark-executor-<group.id <http://group.id>> <topic> 0 even after
> seeking to offset” – *depend on version of spark-streaming-kafka lib. We're
> running Cloudera distribution of Kafka.
>
>
>
> My kafka streams app config is following:
>
>
>
> application.id = MultipleTopicsAppTest
>
> application.server =
>
> bootstrap.servers = [host:9092]
>
> buffered.records.per.partition = 1000
>
> cache.max.bytes.buffering = 10485760
>
> client.id =
>
> commit.interval.ms = 100
>
> connections.max.idle.ms = 540000
>
> default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
>
> default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
>
> default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
>
> default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
>
> key.serde = null
>
> metadata.max.age.ms = 300000
>
> metric.reporters = []
>
> metrics.num.samples = 2
>
> metrics.recording.level = INFO
>
> metrics.sample.window.ms = 30000
>
> num.standby.replicas = 0
>
> num.stream.threads = 1
>
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
>
> poll.ms = 100
>
> processing.guarantee = exactly_once
>
> receive.buffer.bytes = 32768
>
> reconnect.backoff.max.ms = 1000
>
> reconnect.backoff.ms = 50
>
> replication.factor = 1
>
> request.timeout.ms = 40000
>
> retry.backoff.ms = 100
>
> rocksdb.config.setter = null
>
> security.protocol = PLAINTEXT
>
> send.buffer.bytes = 131072
>
> state.cleanup.delay.ms = 600000
>
> state.dir = /tmp/kafka-streams
>
> timestamp.extractor = null
>
> value.serde = null
>
> windowstore.changelog.additional.retention.ms = 86400000
>
> zookeeper.connect =
>
>
>
> What can cause this problem? Thanks!
>