You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Федор Чернилин <in...@gmail.com> on 2019/03/15 10:28:31 UTC

Kafka Streams Join does not work

Hello! I encountered the following problem. I have 3 brokers and 1
zookeper. Topics with 10 partitions and replication factor 3. Stream app
with 10 threads, exactly_once and commit interval 1000ms. When I run stream
app, join of 2 my topics doesn't work for specific message. But for all
another messages system works. If I manually read topics, messages for
joining exists.
What could cause this problem?

I noticed the following thing. Before writing messages into topic, broker2
fell and was restarted. Interested message is located in 1 partition.
I have next information for 1 partition
Topic: fc-id-to-delivery-key-lookup     Partition: 1    Leader: 2
Replicas: 2,0,1 Isr: 0,1,2
Maybe this is due to the fact that partition 1 is the leader but does not
exist on the broker2?
This can be and how to check it? Or maybe another reason?

Also before broker2 was restarted, I saw many errors into logs such as
INFO Opening socket connection to server kafka-zookeeper:2181. Will not
attempt to authenticate using SASL (unknown error)
(org.apache.zookeeper.ClientCnxn)
WARN Session 0x1000b358e830002 for server kafka-zookeeper:2181, unexpected
error, closing socket connection and attempting reconnect
(org.apache.zookeeper.ClientCnxn)
...
ERROR [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for
partition __transaction_state-12 at offset 0
(kafka.server.ReplicaFetcherThread)
"org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This
server does not host this topic-partition.
for all topics
...
"
WARN [SocketServer brokerId=2] Unexpected error from /10.8.1.1; closing
connection (org.apache.kafka.common.network.Selector) "

After restart
one
ERROR Could not submit metrics to Kafka topic __confluent.support.metrics:
Failed to construct kafka producer
(io.confluent.support.metrics.BaseMetricsReporter) "
...
many
ERROR [Broker id=2] Received LeaderAndIsrRequest with correlation id 1 from
controller 1 epoch 1 for partition __consumer_offsets-29 (last update
controller epoch 1) but cannot become follower since the new leader -1 is
unavailable. (state.change.logger) "
....
many
"[2019-03-14 12:55:48,399] ERROR [Broker id=2] Received LeaderAndIsrRequest
with correlation id 1 from controller 1 epoch 1 for partition
flow-control-streams-KSTREAM-OUTEROTHER-0000000039-store-changelog-5 (last
update controller epoch 1) but cannot become follower since the new leader
-1 is unavailable. (state.change.logger) "


Thanks

Re: Kafka Streams Join does not work

Posted by Ryanne Dolan <ry...@gmail.com>.
If your producer is using transactions, it's possible that the producer was
killed in the middle of a transaction, in which case any un-committed
records would be logged on the broker but skipped by downstream consumers.

Otherwise, it's likely that the leader for the partition crashed before the
record was replicated to the 2 other brokers.

In either case, downstream consumers would ignore the record as if it never
existed.

re why the join didn't work: the consumers processing the partition would
ignore the uncommitted record, and Streams would never see it. This is by
design.

Ryanne

On Fri, Mar 15, 2019 at 10:36 AM Федор Чернилин <in...@gmail.com>
wrote:

> Thanks, I understand how consumer works. The main question is related to
> why the join did not work and how it happened that only one message
> remained uncommitted.
>
> пт, 15 мар. 2019 г. в 16:29, Ryanne Dolan <ry...@gmail.com>:
>
> > Hello! When using exactly-once semantics, uncommitted or aborted records
> > are skipped by the consumer as if they don't exist.
> >
> > When inspecting the topic manually, use isolation.level=read_committed to
> > get the same behavior.
> >
> > Ryanne
> >
> > On Fri, Mar 15, 2019, 6:08 AM Федор Чернилин <in...@gmail.com>
> > wrote:
> >
> > > I also noticed another important thing now. Message which used for join
> > is
> > > uncommitted. I understood it with the help of consumer's setting
> > isolation
> > > level - read_committed. The message got into the topic using the same
> > > stream app. Remind that stream app has processing guarantee
> > > = exactly_once. How could this happen?
> > >
> >
>

Re: Kafka Streams Join does not work

Posted by Федор Чернилин <in...@gmail.com>.
Thanks, I understand how consumer works. The main question is related to
why the join did not work and how it happened that only one message
remained uncommitted.

пт, 15 мар. 2019 г. в 16:29, Ryanne Dolan <ry...@gmail.com>:

> Hello! When using exactly-once semantics, uncommitted or aborted records
> are skipped by the consumer as if they don't exist.
>
> When inspecting the topic manually, use isolation.level=read_committed to
> get the same behavior.
>
> Ryanne
>
> On Fri, Mar 15, 2019, 6:08 AM Федор Чернилин <in...@gmail.com>
> wrote:
>
> > I also noticed another important thing now. Message which used for join
> is
> > uncommitted. I understood it with the help of consumer's setting
> isolation
> > level - read_committed. The message got into the topic using the same
> > stream app. Remind that stream app has processing guarantee
> > = exactly_once. How could this happen?
> >
>

Re: Kafka Streams Join does not work

Posted by Ryanne Dolan <ry...@gmail.com>.
Hello! When using exactly-once semantics, uncommitted or aborted records
are skipped by the consumer as if they don't exist.

When inspecting the topic manually, use isolation.level=read_committed to
get the same behavior.

Ryanne

On Fri, Mar 15, 2019, 6:08 AM Федор Чернилин <in...@gmail.com>
wrote:

> I also noticed another important thing now. Message which used for join is
> uncommitted. I understood it with the help of consumer's setting isolation
> level - read_committed. The message got into the topic using the same
> stream app. Remind that stream app has processing guarantee
> = exactly_once. How could this happen?
>

Re: Kafka Streams Join does not work

Posted by Федор Чернилин <in...@gmail.com>.
I also noticed another important thing now. Message which used for join is
uncommitted. I understood it with the help of consumer's setting isolation
level - read_committed. The message got into the topic using the same
stream app. Remind that stream app has processing guarantee
= exactly_once. How could this happen?