You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Seweryn Habdank-Wojewodzki (JIRA)" <ji...@apache.org> on 2017/11/24 10:25:00 UTC

[jira] [Updated] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception

     [ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Seweryn Habdank-Wojewodzki updated KAFKA-6260:
----------------------------------------------
    Summary: AbstractCoordinator not clearly handles NULL Exception  (was: KafkaStream 1.0.0 not clearly handles NULL Exception)

> AbstractCoordinator not clearly handles NULL Exception
> ------------------------------------------------------
>
>                 Key: KAFKA-6260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6260
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: Seweryn Habdank-Wojewodzki
>
> The error reporting is not clear. But it seems that Kafka Heartbeat shuts down application due to NULL exception caused by "fake" disconnections.
> One more comment. We are processing messages in the stream, but sometimes we have to block processing for minutes, as consumers are not handling too much load. Is it possibble that when stream is waiting, then heartbeat is as well blocked?
> Can you check that?
> {code}
> 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Received successful Heartbeat response
> 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Sending Heartbeat request to coordinator cljp01.eb.lan.at:9093 (id: 2147483646 rack: null)
> 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Sending HEARTBEAT {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} with correlation id 24 to node 2147483646
> 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT with correlation id 24, received {throttle_time_ms=0,error_code=0}
> 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Received successful Heartbeat response
> 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout.
> 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Cancelled request {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} with correlation id 21 due to node 1 being disconnected
> 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, apiVersion=6, clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, correlationId=21) with correlation id 21 due to node 1 being disconnected
> 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Fetch request {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed org.apache.kafka.common.errors.DisconnectException: null
> 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Found least loaded node cljp01.eb.lan.at:9093 (id: 1 rack: DC-1)
> 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Initialize connection to node cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) for sending metadata request
> 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Initiating connection to node cljp01.eb.lan.at:9093 (id: 1 rack: DC-1)
> 2017-11-23 23:54:52 ERROR AbstractCoordinator:296 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Heartbeat thread for group kafka-endpoint failed due to unexpected error
> java.lang.NullPointerException: null
>         at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436) ~[my-kafka-endpoint.jar:?]
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:395) ~[my-kafka-endpoint.jar:?]
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) ~[my-kafka-endpoint.jar:?]
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) ~[my-kafka-endpoint.jar:?]
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:275) ~[my-kafka-endpoint.jar:?]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:934) [my-kafka-endpoint.jar:?]
> 2017-11-23 23:54:52 DEBUG AbstractCoordinator:177 - [Consumer clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, groupId=kafka-endpoint] Heartbeat thread has closed
> 2017-11-23 23:55:16 INFO  KafkaElasticsearchEndpoint:61 - Got shutdown requests ...
> 2017-11-23 23:55:16 INFO  StreamProcessor:106 - Streams closing ...
> 2017-11-23 23:55:16 INFO  StreamProcessor:111 - with 5 [s] timeout
> 2017-11-23 23:55:16 DEBUG KafkaStreams:183 - stream-client [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31]Stopping Streams client with timeoutMillis = 5000 ms.
> 2017-11-23 23:55:16 INFO  KafkaStreams:346 - stream-client [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31]State transition from RUNNING to PENDING_SHUTDOWN
> 2017-11-23 23:55:16 INFO  StreamThread:336 - stream-thread [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1] Informed to shut down
> 2017-11-23 23:55:16 INFO  StreamThread:346 - stream-thread [kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
> {code}
> Kafka settings:
> {code}
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = earliest
>         bootstrap.servers = [xxx:9093, yyy:9093]
>         check.crcs = true
>         client.id = kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer
>         connections.max.idle.ms = 540000
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 6000
>         fetch.min.bytes = 1
>         group.id = kafka-endpoint
>         heartbeat.interval.ms = 3000
>         interceptor.classes = null
>         internal.leave.group.on.close = false
>         isolation.level = read_uncommitted
>         key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 10000000
>         max.poll.records = 2000
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 7000
>         retry.backoff.ms = 100
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism = GSSAPI
>         security.protocol = SSL
>         send.buffer.bytes = 131072
>         session.timeout.ms = 6000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = [hidden]
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = /data/my/etc/kafka/client-keystore
>         ssl.keystore.password = [hidden]
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = SHA1PRNG
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = /data/my/etc/kafka/truststore
>         ssl.truststore.password = [hidden]
>         ssl.truststore.type = JKS
>         value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)