You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2021/04/05 23:11:29 UTC

Re: Kafka Streams application startup issues

Hello Mikko,

The issue that you saw restoration repeating and never completed is a bit
weird, and without further logs I cannot tell exactly what's the root cause.

At the same time, if your common scenario is upgrade, maybe you can
consider using static members to avoid unnecessary rebalances (and hence
the restoration it brought in):

https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances

Another thing is that, if for some reasons (including the rolling bounces)
that some rebalances are unavoidable and is taking time, you can use
standby replicas to still serve query access:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance

Hope these ref links may help you.


Guozhang

On Wed, Mar 31, 2021 at 8:20 AM Hänninen, Mikko
<mi...@f-secure.com.invalid> wrote:

> Hello,
>
>
>
> I’m working on creating an application that leverages Kafka and Kafka
> Streams. I have some issues with application startup that I’ve been unable
> to solve myself even with the help of my team mates, so I’m writing here in
> the hopes that someone could offer help. I’d be very grateful.
>
>
>
>
>
> Application description:
>
>
>
> The application is running in AWS and uses the AWS MSK service for the
> operation of the brokers. The application is running in parallel on
> multiple nodes, typically we have 3 but it’s meant to scale to tens if
> needed. The number of brokers is also currently 3. Kafka version is
> 2.6.0, both in MSK and in the Kafka libraries included with the application.
>
>
>
> The application is running in US west coast, while the Kafka brokers are
> in Europe, so there is some network lag between. (There’s another group of 3
> servers running also in Europe, with a different application id
> configured, so the servers in a given geographic location have their own
> consumer groups.)
>
>
>
> The application uses a Kafka Streams ReadOnlyKeyValueStore to consume a
> Kafka topic, say topic R, which has key-value pairs. The key is a string or
> other multibyte value, the value is a serialised structure with a number
> (Long) and some other data. The application provides a REST API through
> which clients can make requests with some key, and the application returns
> the number from the value, which should be the latest number seen for the
> given key in the topic. The goal of the API is to respond within
> milliseconds, e.g. 5 or 10 ms or so. (This is the reason why the
> application servers are geographically far away from the brokers, to
> provide low latency in that location.)
>
>
>
> If the requested key is not local on a given server, the application
> determines which server has that key based on the Kafka metadata, and
> forwards the request to that server. This part works fine, at least in
> terms of Kafka use.
>
>
>
> The key space is expected to be very large, perhaps tens or hundreds of
> millions and maybe more. The application is still in development so we
> have not seen that many yet in practice, at most it’s probably some few
> thousands or tens of thousands with generated test data.
>
>
>
>
>
> Problem description:
>
>
>
> The reason why I’m writing here is to get help with Kafka/Kafka Streams
> startup issues. Sometimes, but much too frequently, when all the servers
> are restarted e.g. due to deploying a new version of the application, some
> of the applications will not start up cleanly.
>
>
>
> At first there was the error with the message “topic may have migrated to
> another instance”. This was eventually solved by applying retrying for more
> than 10 minutes, after which there was apparently a rebalance and the
> server in question was able to synchronise with Kafka and join to the
> consumer group. This still happens and having a startup time of over 10
> minutes is not desirable, but at least it’s no longer blocking development.
>
>
>
> Now there’s a second startup issue, with an exception
> org.apache.kafka.common.errors.DisconnectException being thrown by
> org.apache.kafka.clients.FetchSessionHandler with the message “Error
> sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2:”
>
>
>
> Before the timeout there’s a restore log message “stream-thread
> [query-api-us-west-2-0943f8d4-1720-4b3b-904d-d2efa190a135-StreamThread-1]
> Restoration in progress for 20 partitions.” followed by a dump of the 20
> partitions. e.g.
> “{query-api-us-west-2-query-api-us-west-2-prevalence-ratings-changelog-49:
> position=0, end=37713, totalRestored=0}” -- the position and totalRestored
> are always 0.
>
> The partitions are for the changelog topic associated with the above
> mentioned topic R. There are 60 partitions total in R, so 20 matches the
> expected count per server (60/3). I’m assuming the number of partitions in
> the changelog is the same as the actual topic.
>
>
>
> These log messages repeat every 31 seconds or so.
>
>
>
> Kafka Streams state does not reach RUNNING, the application waits for that
> to happen before starting to serve requests.
>
>
>
> This error can persist even if the application is restarted.
>
>
>
> I’ve looked into network issues, but there doesn’t seem to be any At times
> the servers run fine, so this seems to be intermittent. Also, it’s possible
> to use the command line Kafka tools e.g. kafka-topics.sh to list the
> topics, so communication with Kafka brokers can work just fine from the
> server even while the application is stuck in the failing state. The issue
> seems to be somehow with the application, quite likely with the
> configuration.
>
>
>
> I have tried to increase the configuration value fetch.max.wait.ms from
> 500 (the default) to 1000 and even to 10000 with no apparent effect.
>
>
>
>
>
> There does not seem to be any issues with the brokers themselves. There
> are no errors in the logs and all metrics are normal as recommended by AWS
> for the MSK.
>
>
>
>
>
>
>
> Kafka Streams configuration values below, most are defaults:
>
>
>
> StreamsConfig values:
>
>
>
> acceptable.recovery.lag = 10000
>
> application.id = query-api-us-west-2
>
> application.server = ip-10-200-246-134.us-west-2.compute.internal:8080
>
> bootstrap.servers = [
> b-3.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
> b-1.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
> b-2.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094]
>
> buffered.records.per.partition = 1000
>
> built.in.metrics.version = latest
>
> cache.max.bytes.buffering = 10485760
>
> client.id =
>
> commit.interval.ms = 30000
>
> connections.max.idle.ms = 540000
>
> default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
>
> default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$StringSerde
>
> default.production.exception.handler = class
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
>
> default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
>
> default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$LongSerde
>
> max.task.idle.ms = 0
>
> max.warmup.replicas = 2
>
> metadata.max.age.ms = 300000
>
> metric.reporters = []
>
> metrics.num.samples = 2
>
> metrics.recording.level = INFO
>
> metrics.sample.window.ms = 30000
>
> num.standby.replicas = 0
>
> num.stream.threads = 1
>
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
>
> poll.ms = 100
>
> probing.rebalance.interval.ms = 600000
>
> processing.guarantee = at_least_once
>
> receive.buffer.bytes = 32768
>
> reconnect.backoff.max.ms = 1000
>
> reconnect.backoff.ms = 50
>
> replication.factor = 1
>
> request.timeout.ms = 40000
>
> retries = 0
>
> retry.backoff.ms = 100
>
> rocksdb.config.setter = null
>
> security.protocol = ssl
>
> send.buffer.bytes = 131072
>
> state.cleanup.delay.ms = 600000
>
> state.dir = /data/query-api/kafka-streams
>
> topology.optimization = none
>
> upgrade.from = null
>
> windowstore.changelog.additional.retention.ms = 86400000
>
>
>
>
>
> ConsumerConfig values:
>
>
>
> allow.auto.create.topics = false
>
> auto.commit.interval.ms = 5000
>
> auto.offset.reset = earliest
>
> bootstrap.servers = [
> b-3.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
> b-1.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
> b-2.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094]
>
> check.crcs = true
>
> client.dns.lookup = use_all_dns_ips
>
> client.id =
> query-api-us-west-2-0943f8d4-1720-4b3b-904d-d2efa190a135-StreamThread-1-consumer
>
> client.rack =
>
> connections.max.idle.ms = 540000
>
> default.api.timeout.ms = 60000
>
> enable.auto.commit = false
>
> exclude.internal.topics = true
>
> fetch.max.bytes = 52428800
>
> fetch.max.wait.ms = 500
>
> fetch.min.bytes = 1
>
> group.id = query-api-us-west-2
>
> group.instance.id = null
>
> heartbeat.interval.ms = 10000
>
> interceptor.classes = []
>
> internal.leave.group.on.close = false
>
> internal.throw.on.fetch.stable.offset.unsupported = false
>
> isolation.level = read_uncommitted
>
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
> max.partition.fetch.bytes = 1048576
>
> max.poll.interval.ms = 300000
>
> max.poll.records = 1000
>
> metadata.max.age.ms = 300000
>
> metric.reporters = []
>
> metrics.num.samples = 2
>
> metrics.recording.level = INFO
>
> metrics.sample.window.ms = 30000
>
> partition.assignment.strategy =
> [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
>
> receive.buffer.bytes = 65536
>
> reconnect.backoff.max.ms = 1000
>
> reconnect.backoff.ms = 50
>
> request.timeout.ms = 30000
>
> retry.backoff.ms = 100
>
> sasl.client.callback.handler.class = null
>
> sasl.jaas.config = null
>
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
> sasl.kerberos.min.time.before.relogin = 60000
>
> sasl.kerberos.service.name = null
>
> sasl.kerberos.ticket.renew.jitter = 0.05
>
> sasl.kerberos.ticket.renew.window.factor = 0.8
>
> sasl.login.callback.handler.class = null
>
> sasl.login.class = null
>
> sasl.login.refresh.buffer.seconds = 300
>
> sasl.login.refresh.min.period.seconds = 60
>
> sasl.login.refresh.window.factor = 0.8
>
> sasl.login.refresh.window.jitter = 0.05
>
> sasl.mechanism = GSSAPI
>
> security.protocol = ssl
>
> security.providers = null
>
> send.buffer.bytes = 131072
>
> session.timeout.ms = 30000
>
> ssl.cipher.suites = null
>
> ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>
> ssl.endpoint.identification.algorithm = https
>
> ssl.engine.factory.class = null
>
> ssl.key.password = null
>
> ssl.keymanager.algorithm = SunX509
>
> ssl.keystore.location = null
>
> ssl.keystore.password = null
>
> ssl.keystore.type = JKS
>
> ssl.protocol = TLSv1.3
>
> ssl.provider = null
>
> ssl.secure.random.implementation = null
>
> ssl.trustmanager.algorithm = PKIX
>
> ssl.truststore.location = null
>
> ssl.truststore.password = null
>
> ssl.truststore.type = JKS
>
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
>
>
>
>
> Best regards,
>
> Mikko Hänninen
>
> --
>
> *Mikko Hänninen*
>
> Senior Developer, Security Research and Technologies
>
> F-Secure
>
> mikko.hanninen@f-secure.com
>
>
>
>
> *www.f-secure.com* <https://www.f-secure.com/>
>
>
>
> <https://www.facebook.com/F-Secure-107471754306/>
> <https://twitter.com/fsecure>  <https://www.youtube.com/f-secure>
> <https://www.linkedin.com/company/f-secure-corporation/>
> <https://blog.f-secure.com/>
>
>
>


-- 
-- Guozhang