You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tony Liu <ji...@zuora.com> on 2016/12/19 01:29:59 UTC

Timeout publishing message to Kafka cluster.

Hi,

Recently, we ran into the `batch expired` error in several days, may be 3
or 5 days, there is not fixed frequency.

*A,* the error is:
Exception Class : org.apache.kafka.common.errors.TimeoutException
Error Message : Batch Expired

*B*: server.log from kafka :

[2016-12-18 20:45:32,371] INFO  Partition [thl_raw,43] on broker 1002:
Shrinking ISR for partition [thl_raw,43] from 1006,1001,1002 to 1002
(kafka.cluster.Partition)
[2016-12-18 20:45:32,376] INFO  Partition [HeartBit,6] on broker 1002:
Shrinking ISR for partition [HeartBit,6] from 1005,1006,1002 to 1002
(kafka.cluster.Partition)
[2016-12-18 20:45:32,378] INFO  Partition [thl_raw,31] on broker 1002:
Shrinking ISR for partition [thl_raw,31] from 1005,1004,1002 to 1002
(kafka.cluster.Partition)
[2016-12-18 20:45:32,382] INFO  Partition [HeartBit,0] on broker 1002:
Shrinking ISR for partition [HeartBit,0] from 1004,1005,1002 to 1002
(kafka.cluster.Partition)
[2016-12-18 20:45:32,384] INFO  Partition [ConnectorSync,7] on broker 1002:
Shrinking ISR for partition [ConnectorSync,7] from 1001,1002,1003 to 1002
(kafka.cluster.Partition)
[2016-12-18 20:45:32,386] INFO  Partition [__consumer_offsets,8] on broker
1002: Shrinking ISR for partition [__consumer_offsets,8] from
1005,1004,1002 to 1002 (kafka.cluster.Partition)
[2016-12-18 20:45:32,389] INFO  Partition [thl_raw,37] on broker 1002:
Shrinking ISR for partition [thl_raw,37] from 1005,1006,1002 to 1002
(kafka.cluster.Partition)
[2016-12-18 20:45:32,391] INFO  Partition [HeartBeat,3] on broker 1002:
Shrinking ISR for partition [HeartBeat,3] from 1005,1004,1002 to 1002
(kafka.cluster.Partition)
[2016-12-18 21:17:59,888] INFO  Rolled new log segment for
'__consumer_offsets-46' in 1 ms. (kafka.log.Log)
[2016-12-18 21:19:07,923] INFO  Deleting segment 0 from log
__consumer_offsets-46. (kafka.log.Log)
[2016-12-18 21:19:07,923] INFO  Deleting segment 101935860 from log
__consumer_offsets-46. (kafka.log.Log)
[2016-12-18 21:19:07,924] INFO  Deleting index
/kafka/data/__consumer_offsets-46/00000000000000000000.index.deleted
(kafka.log.OffsetIndex)
[2016-12-18 21:19:07,924] INFO  Deleting index
/kafka/data/__consumer_offsets-46/00000000000101935860.index.deleted
(kafka.log.OffsetIndex)
[2016-12-18 21:19:07,924] INFO  Deleting index
/kafka/data/__consumer_offsets-46/00000000000000000000.timeindex.deleted
(kafka.log.TimeIndex)
[2016-12-18 21:19:07,924] INFO  Deleting index
/kafka/data/__consumer_offsets-46/00000000000101935860.timeindex.deleted
(kafka.log.TimeIndex)
[2016-12-18 21:19:08,393] INFO  Deleting segment 102963875 from log
__consumer_offsets-46. (kafka.log.Log)
[2016-12-18 21:19:08,410] INFO  Deleting index
/kafka/data/__consumer_offsets-46/00000000000102963875.index.deleted
(kafka.log.OffsetIndex)
[2016-12-18 21:19:08,410] INFO  Deleting index
/kafka/data/__consumer_offsets-46/00000000000102963875.timeindex.deleted
(kafka.log.TimeIndex)
[2016-12-18 21:48:53,007] INFO  Rolled new log segment for 'thl_raw-24' in
1 ms. (kafka.log.Log)
[2016-12-18 22:15:09,894] INFO  Rolled new log segment for 'thl_raw-1' in 0
ms. (kafka.log.Log)
[2016-12-18 23:34:28,526] INFO  Rolled new log segment for 'thl_raw-9' in 1
ms. (kafka.log.Log)
[2016-12-18 23:34:28,754] INFO  Rolled new log segment for 'thl_raw-39' in
0 ms. (kafka.log.Log)
[2016-12-18 23:34:28,786] INFO  Rolled new log segment for 'thl_raw-7' in 0
ms. (kafka.log.Log)
[2016-12-19 00:04:32,816] INFO  Rolled new log segment for 'thl_raw-15' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:33,049] INFO  Rolled new log segment for 'thl_raw-44' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:33,137] INFO  Rolled new log segment for 'thl_raw-20' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:33,305] INFO  Rolled new log segment for 'thl_raw-40' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:33,380] INFO  Rolled new log segment for 'thl_raw-59' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:33,470] INFO  Rolled new log segment for 'thl_raw-50' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:33,630] INFO  Rolled new log segment for 'thl_raw-35' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:33,995] INFO  Rolled new log segment for 'thl_raw-45' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:34,007] INFO  Rolled new log segment for 'thl_raw-34' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:34,265] INFO  Rolled new log segment for 'thl_raw-48' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:34,359] INFO  Rolled new log segment for 'thl_raw-54' in
1 ms. (kafka.log.Log)
[2016-12-19 00:04:34,367] INFO  Rolled new log segment for 'thl_raw-10' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:34,540] INFO  Rolled new log segment for 'thl_raw-2' in 0
ms. (kafka.log.Log)
[2016-12-19 00:04:35,123] INFO  Rolled new log segment for 'thl_raw-14' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:36,822] INFO  Rolled new log segment for 'thl_raw-29' in
0 ms. (kafka.log.Log)
[2016-12-19 00:04:36,970] INFO  Rolled new log segment for 'thl_raw-18' in
0 ms. (kafka.log.Log)

*C*, when that kind of error happened, we always see the replication being
in problem, like:

Topics
Topic# Partitions# BrokersBrokers Spread %Brokers Skew %# ReplicasUnder
Replicated %Producer Message/Sec
__consumer_offsets
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/__consumer_offsets>
50 6 100 0 3 16 0.00
ConnectorSync
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/ConnectorSync>
8 6 100 16 3 25 0.00
EventInstance
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/EventInstance>
8 6 100 16 3 12 0.00
fjord_healthy_checker
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/fjord_healthy_checker>
8 6 100 16 3 12 0.00
HeartBeat
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBeat>
8 6 100 16 3 12 0.00
HeartBit
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBit>
8 6 100 0 3 25 0.00
Notification
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/Notification>
8 6 100 33 3 12 0.00
NotificationEventInstance
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/NotificationEventInstance>
8 6 100 16 3 12 0.00
thl_raw
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/thl_raw>
64 6 100 0 3 17 0.00
*D*, All of the replication sounds related with node '1002` (click into the
each of topic, all of the issued partitions having the similar like `*blue
highlight*` )
Partition Information
PartitionLatest OffsetLeaderReplicasIn Sync ReplicasPreferred Leader?Under
Replicated?
0 1005
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
(1005,1001,1002) (1005,1002,1001) true false
1 1006
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
(1006,1002,1003) (1006,1003,1002) true false
2 1001
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1001>
(1001,1003,1004) (1004,1003,1001) true false
3 *1002*
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
*(1002,1004,1005)* *(1002)* *true* *true*
4 1003
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1003>
(1003,1005,1006) (1003,1006,1005) true false
5 1004
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1004>
(1004,1006,1001) (1004,1001,1006) true false
6 1005
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
(1005,1002,1003) (1003,1005,1002) true false
7 1006
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
(1006,1003,1004) (1003,1006,1004) true false

Re: Timeout publishing message to Kafka cluster.

Posted by Tony Liu <ji...@zuora.com>.
if any of you have any idea about this issue, please let me know.

I still confused some points :

1), why restart the node `1002` can automatically solve the `replication
issue` ?
2), is there any possible this issue caused by the `log retention` setting?
 i.e `log.segment.bytes=136870912` , is it too small (137M for each log
file) ?

I still guess this issue is mostly caused by my `server.properties`, but
just no idea which part is wrong.

Thanks.

On Sun, Dec 18, 2016 at 5:35 PM, Tony Liu <ji...@zuora.com> wrote:

> ​Post the configuration here for help:
>
> [root@2494f8e6fb37 config]# vi server.properties
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #    http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
>
> # Reference :
> # 1), https://kafka.apache.org/documentation#configuration
> # 2), https://kafka.apache.org/documentation#prodconfig
>
> ############################# Server Basics #############################
>
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=-1
>
> ############################# Socket Server Settings
> #############################
>
> # The address the socket server listens on. It will get the value returned
> from
> # java.net.InetAddress.getCanonicalHostName() if not configured.
> #   FORMAT:
> #     listeners = security_protocol://host_name:port
> #   EXAMPLE:
> #     listeners = PLAINTEXT://your.host.name:9092
> #listeners=PLAINTEXT://:9092
>
> # Hostname and port the broker will advertise to producers and consumers.
> If not set,
> # it uses the value for "listeners" if configured.  Otherwise, it will use
> the value
> # returned from java.net.InetAddress.getCanonicalHostName().
> #advertised.listeners=PLAINTEXT://your.host.name:9092
>
> # The number of threads handling network requests
> num.network.threads=8
>
> # The number of threads doing disk I/O
> num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=1048576
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=1048576
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
>
> ############################# Log Basics #############################
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/kafka/data
>
> # Enable auto creation of topic on the server
> auto.create.topics.enable=true
>
> # Enables delete topic. Delete topic through the admin tool will have no
> effect if this config is turned off
> delete.topic.enable=true
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> # by default, we create 8 partitions for each topic, if we wanna increase
> the number, we need to manually enlarge it.
> num.partitions=8
>
> # default replication factors for automatically created topics
> default.replication.factor=3
>
> # The number of threads per data directory to be used for log recovery at
> startup and flushing at shutdown.
> # This value is recommended to be increased for installations with data
> dirs located in RAID array.
> num.recovery.threads.per.data.dir=1
>
> # The interval with which we add an entry to the offset index
> log.index.interval.bytes=4096
>
> # The maximum size in bytes of the offset index
> log.index.size.max.bytes=10485760
>
> ############################# Log Flush Policy
> #############################
>
> # Messages are immediately written to the filesystem but by default we
> only fsync() to sync
> # the OS cache lazily. The following configurations control the flush of
> data to disk.
> # There are a few important trade-offs here:
> #    1. Durability: Unflushed data may be lost if you are not using
> replication.
> #    2. Latency: Very large flush intervals may lead to latency spikes
> when the flush does occur as there will be a lot of data to flush.
> #    3. Throughput: The flush is generally the most expensive operation,
> and a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on
> a per-topic basis.
>
> # The number of messages to accept before forcing a flush of data to disk
> log.flush.interval.messages=20000
>
> # The maximum amount of time a message can sit in a log before we force a
> flush
> log.flush.interval.ms=10000
>
> # The frequency in ms that the log flusher checks whether any log needs to
> be flushed to disk
> log.flush.scheduler.interval.ms=20000
>
> # The frequency with which we update the persistent record of the last
> flush which acts as the log recovery point
> log.flush.offset.checkpoint.interval.ms=60000
>
> ############################# Log Retention Policy
> #############################
>
> # The following configurations control the disposal of log segments. The
> policy can
> # be set to delete segments after a period of time, or after a given size
> has accumulated.
> # A segment will be deleted whenever *either* of these criteria are met.
> Deletion always happens
> # from the end of the log.
>
> #The default cleanup policy for segments beyond the retention window. A
> comma separated list of valid policies. Valid policies are: "delete" and
> "compact"
> log.cleanup.policy=delete
>
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=36
>
> # A size-based retention policy for logs. Segments are pruned from the log
> as long as the remaining
> # segments don't drop below log.retention.bytes.
> #log.retention.bytes=1073741824
>
> # The maximum size of a log segment file. When this size is reached a new
> log segment will be created.
> log.segment.bytes=136870912
>
> # The interval at which log segments are checked to see if they can be
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=300000
>
> #The maximum time before a new log segment is rolled out (in hours),
> secondary to log.roll.ms property
> log.roll.hours=12
>
> #The amount of time to wait before deleting a file from the filesystem
> log.segment.delete.delay.ms=60000
>
> #The maximum size of message that the server can receive
> message.max.bytes=10485760
>
> ############################# Replication #############################
>
> # Number of fetcher threads used to replicate messages from a source
> broker. Increasing this value can increase the degree of I/O parallelism in
> the follower broker.
> num.replica.fetchers=4
>
> # The number of bytes of messages to attempt to fetch for each partition.
> # This is not an absolute maximum,
> # if the first message in the first non-empty partition of the fetch is
> larger than this value, the message will still be returned to ensure that
> progress can be made.
> # The maximum message size accepted by the broker is defined via
> message.max.bytes (broker config) or max.message.bytes (topic config).
> replica.fetch.max.bytes=10485760
>
> # max wait time for each fetcher request issued by follower replicas.
> # This value should always be less than the replica.lag.time.max.ms at
> all times to prevent frequent shrinking of ISR for low throughput topics
> replica.fetch.wait.max.ms=500
>
> # The frequency with which the high watermark is saved out to disk
> replica.high.watermark.checkpoint.interval.ms=5000
>
> # The socket receive buffer for network requests
> replica.socket.timeout.ms=30000
>
> # The socket receive buffer for network requests
> replica.socket.receive.buffer.bytes=65536
>
> # If a follower hasn't sent any fetch requests or hasn't consumed up to
> the leaders log end offset for at least this time, the leader will remove
> the follower from isr
> replica.lag.time.max.ms=10000
>
> # The socket timeout for controller-to-broker channels
> controller.socket.timeout.ms=30000
>
> ############################# Zookeeper #############################
>
> # #The max time that the client waits to establish a connection to
> zookeeper. If not set, the value in zookeeper.session.timeout.ms is used
> zookeeper.connection.timeout.ms=6000
>
> #Zookeeper session timeout
> zookeeper.session.timeout.ms=6000
>
> #Set client to use secure ACLs
> zookeeper.set.acl=false
>
> #How far a ZK follower can be behind a ZK leader
> zookeeper.sync.time.ms=2000
>
>
> zookeeper.connect=10.105.212.11:2181,10.105.211.109:2181,10
> .105.210.103:2181
> jvm.performance.opts=-javaagent:/opt/jolokia/jolokia-jvm-1.3.5-agent.jar=
> host=
> port=9092
> advertised.host.name=10.105.210.24
> advertised.port=9092​
>
> On Sun, Dec 18, 2016 at 5:35 PM, Tony Liu <ji...@zuora.com> wrote:
>
>> when that error happened, I need to manually restart the kafka node
>> `1002`, after restart finishing, all of the partition is being healthy
>> again.
>>
>> i.e
>> *before start ​:*
>> 3 *1002*
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>> *(1002,1004,1005)* *(1002)* *true* *true*
>>
>> *​After start:*
>> 3
>> *​       ​1002*
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>> *(1002,1004,1005)*
>> *(1002​, 1004, 1005​)* *true* *true*
>> ​
>>
>>
>> On Sun, Dec 18, 2016 at 5:29 PM, Tony Liu <ji...@zuora.com> wrote:
>>
>>> Hi,
>>>
>>> Recently, we ran into the `batch expired` error in several days, may be
>>> 3 or 5 days, there is not fixed frequency.
>>>
>>> *A,* the error is:
>>> Exception Class : org.apache.kafka.common.errors.TimeoutException
>>> Error Message : Batch Expired
>>>
>>> *B*: server.log from kafka :
>>>
>>> [2016-12-18 20:45:32,371] INFO  Partition [thl_raw,43] on broker 1002:
>>> Shrinking ISR for partition [thl_raw,43] from 1006,1001,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,376] INFO  Partition [HeartBit,6] on broker 1002:
>>> Shrinking ISR for partition [HeartBit,6] from 1005,1006,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,378] INFO  Partition [thl_raw,31] on broker 1002:
>>> Shrinking ISR for partition [thl_raw,31] from 1005,1004,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,382] INFO  Partition [HeartBit,0] on broker 1002:
>>> Shrinking ISR for partition [HeartBit,0] from 1004,1005,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,384] INFO  Partition [ConnectorSync,7] on broker
>>> 1002: Shrinking ISR for partition [ConnectorSync,7] from 1001,1002,1003 to
>>> 1002 (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,386] INFO  Partition [__consumer_offsets,8] on
>>> broker 1002: Shrinking ISR for partition [__consumer_offsets,8] from
>>> 1005,1004,1002 to 1002 (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,389] INFO  Partition [thl_raw,37] on broker 1002:
>>> Shrinking ISR for partition [thl_raw,37] from 1005,1006,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 20:45:32,391] INFO  Partition [HeartBeat,3] on broker 1002:
>>> Shrinking ISR for partition [HeartBeat,3] from 1005,1004,1002 to 1002
>>> (kafka.cluster.Partition)
>>> [2016-12-18 21:17:59,888] INFO  Rolled new log segment for
>>> '__consumer_offsets-46' in 1 ms. (kafka.log.Log)
>>> [2016-12-18 21:19:07,923] INFO  Deleting segment 0 from log
>>> __consumer_offsets-46. (kafka.log.Log)
>>> [2016-12-18 21:19:07,923] INFO  Deleting segment 101935860 from log
>>> __consumer_offsets-46. (kafka.log.Log)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000000000000.index.deleted
>>> (kafka.log.OffsetIndex)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000101935860.index.deleted
>>> (kafka.log.OffsetIndex)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000000000000.timeindex.deleted
>>> (kafka.log.TimeIndex)
>>> [2016-12-18 21:19:07,924] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000101935860.timeindex.deleted
>>> (kafka.log.TimeIndex)
>>> [2016-12-18 21:19:08,393] INFO  Deleting segment 102963875 from log
>>> __consumer_offsets-46. (kafka.log.Log)
>>> [2016-12-18 21:19:08,410] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000102963875.index.deleted
>>> (kafka.log.OffsetIndex)
>>> [2016-12-18 21:19:08,410] INFO  Deleting index
>>> /kafka/data/__consumer_offsets-46/00000000000102963875.timeindex.deleted
>>> (kafka.log.TimeIndex)
>>> [2016-12-18 21:48:53,007] INFO  Rolled new log segment for 'thl_raw-24'
>>> in 1 ms. (kafka.log.Log)
>>> [2016-12-18 22:15:09,894] INFO  Rolled new log segment for 'thl_raw-1'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-18 23:34:28,526] INFO  Rolled new log segment for 'thl_raw-9'
>>> in 1 ms. (kafka.log.Log)
>>> [2016-12-18 23:34:28,754] INFO  Rolled new log segment for 'thl_raw-39'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-18 23:34:28,786] INFO  Rolled new log segment for 'thl_raw-7'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:32,816] INFO  Rolled new log segment for 'thl_raw-15'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,049] INFO  Rolled new log segment for 'thl_raw-44'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,137] INFO  Rolled new log segment for 'thl_raw-20'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,305] INFO  Rolled new log segment for 'thl_raw-40'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,380] INFO  Rolled new log segment for 'thl_raw-59'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,470] INFO  Rolled new log segment for 'thl_raw-50'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,630] INFO  Rolled new log segment for 'thl_raw-35'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:33,995] INFO  Rolled new log segment for 'thl_raw-45'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,007] INFO  Rolled new log segment for 'thl_raw-34'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,265] INFO  Rolled new log segment for 'thl_raw-48'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,359] INFO  Rolled new log segment for 'thl_raw-54'
>>> in 1 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,367] INFO  Rolled new log segment for 'thl_raw-10'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:34,540] INFO  Rolled new log segment for 'thl_raw-2'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:35,123] INFO  Rolled new log segment for 'thl_raw-14'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:36,822] INFO  Rolled new log segment for 'thl_raw-29'
>>> in 0 ms. (kafka.log.Log)
>>> [2016-12-19 00:04:36,970] INFO  Rolled new log segment for 'thl_raw-18'
>>> in 0 ms. (kafka.log.Log)
>>>
>>> *C*, when that kind of error happened, we always see the replication
>>> being in problem, like:
>>>
>>> Topics
>>> Topic# Partitions# BrokersBrokers Spread %Brokers Skew %# ReplicasUnder
>>> Replicated %Producer Message/Sec
>>> __consumer_offsets
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/__consumer_offsets>
>>> 50 6 100 0 3 16 0.00
>>> ConnectorSync
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/ConnectorSync>
>>> 8 6 100 16 3 25 0.00
>>> EventInstance
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/EventInstance>
>>> 8 6 100 16 3 12 0.00
>>> fjord_healthy_checker
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/fjord_healthy_checker>
>>> 8 6 100 16 3 12 0.00
>>> HeartBeat
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBeat>
>>> 8 6 100 16 3 12 0.00
>>> HeartBit
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBit>
>>> 8 6 100 0 3 25 0.00
>>> Notification
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/Notification>
>>> 8 6 100 33 3 12 0.00
>>> NotificationEventInstance
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/NotificationEventInstance>
>>> 8 6 100 16 3 12 0.00
>>> thl_raw
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/thl_raw>
>>> 64 6 100 0 3 17 0.00
>>> *D*, All of the replication sounds related with node '1002` (click into
>>> the each of topic, all of the issued partitions having the similar like `*blue
>>> highlight*` )
>>> Partition Information
>>> PartitionLatest OffsetLeaderReplicasIn Sync ReplicasPreferred Leader?Under
>>> Replicated?
>>> 0 1005
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>>> (1005,1001,1002) (1005,1002,1001) true false
>>> 1 1006
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>>> (1006,1002,1003) (1006,1003,1002) true false
>>> 2 1001
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1001>
>>> (1001,1003,1004) (1004,1003,1001) true false
>>> 3 *1002*
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>>> *(1002,1004,1005)* *(1002)* *true* *true*
>>> 4 1003
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1003>
>>> (1003,1005,1006) (1003,1006,1005) true false
>>> 5 1004
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1004>
>>> (1004,1006,1001) (1004,1001,1006) true false
>>> 6 1005
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>>> (1005,1002,1003) (1003,1005,1002) true false
>>> 7 1006
>>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>>> (1006,1003,1004) (1003,1006,1004) true false
>>>
>>
>>
>

Re: Timeout publishing message to Kafka cluster.

Posted by Tony Liu <ji...@zuora.com>.
​Post the configuration here for help:

[root@2494f8e6fb37 config]# vi server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Reference :
# 1), https://kafka.apache.org/documentation#configuration
# 2), https://kafka.apache.org/documentation#prodconfig

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each
broker.
broker.id=-1

############################# Socket Server Settings
#############################

# The address the socket server listens on. It will get the value returned
from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers.
If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use
the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
num.network.threads=8

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept
(protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/kafka/data

# Enable auto creation of topic on the server
auto.create.topics.enable=true

# Enables delete topic. Delete topic through the admin tool will have no
effect if this config is turned off
delete.topic.enable=true

# The default number of log partitions per topic. More partitions allow
greater
# parallelism for consumption, but this will also result in more files
across
# the brokers.
# by default, we create 8 partitions for each topic, if we wanna increase
the number, we need to manually enlarge it.
num.partitions=8

# default replication factors for automatically created topics
default.replication.factor=3

# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data
dirs located in RAID array.
num.recovery.threads.per.data.dir=1

# The interval with which we add an entry to the offset index
log.index.interval.bytes=4096

# The maximum size in bytes of the offset index
log.index.size.max.bytes=10485760

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only
fsync() to sync
# the OS cache lazily. The following configurations control the flush of
data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using
replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when
the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation,
and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=20000

# The maximum amount of time a message can sit in a log before we force a
flush
log.flush.interval.ms=10000

# The frequency in ms that the log flusher checks whether any log needs to
be flushed to disk
log.flush.scheduler.interval.ms=20000

# The frequency with which we update the persistent record of the last
flush which acts as the log recovery point
log.flush.offset.checkpoint.interval.ms=60000

############################# Log Retention Policy
#############################

# The following configurations control the disposal of log segments. The
policy can
# be set to delete segments after a period of time, or after a given size
has accumulated.
# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
# from the end of the log.

#The default cleanup policy for segments beyond the retention window. A
comma separated list of valid policies. Valid policies are: "delete" and
"compact"
log.cleanup.policy=delete

# The minimum age of a log file to be eligible for deletion
log.retention.hours=36

# A size-based retention policy for logs. Segments are pruned from the log
as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.segment.bytes=136870912

# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.retention.check.interval.ms=300000

#The maximum time before a new log segment is rolled out (in hours),
secondary to log.roll.ms property
log.roll.hours=12

#The amount of time to wait before deleting a file from the filesystem
log.segment.delete.delay.ms=60000

#The maximum size of message that the server can receive
message.max.bytes=10485760

############################# Replication #############################

# Number of fetcher threads used to replicate messages from a source
broker. Increasing this value can increase the degree of I/O parallelism in
the follower broker.
num.replica.fetchers=4

# The number of bytes of messages to attempt to fetch for each partition.
# This is not an absolute maximum,
# if the first message in the first non-empty partition of the fetch is
larger than this value, the message will still be returned to ensure that
progress can be made.
# The maximum message size accepted by the broker is defined via
message.max.bytes (broker config) or max.message.bytes (topic config).
replica.fetch.max.bytes=10485760

# max wait time for each fetcher request issued by follower replicas.
# This value should always be less than the replica.lag.time.max.ms at all
times to prevent frequent shrinking of ISR for low throughput topics
replica.fetch.wait.max.ms=500

# The frequency with which the high watermark is saved out to disk
replica.high.watermark.checkpoint.interval.ms=5000

# The socket receive buffer for network requests
replica.socket.timeout.ms=30000

# The socket receive buffer for network requests
replica.socket.receive.buffer.bytes=65536

# If a follower hasn't sent any fetch requests or hasn't consumed up to the
leaders log end offset for at least this time, the leader will remove the
follower from isr
replica.lag.time.max.ms=10000

# The socket timeout for controller-to-broker channels
controller.socket.timeout.ms=30000

############################# Zookeeper #############################

# #The max time that the client waits to establish a connection to
zookeeper. If not set, the value in zookeeper.session.timeout.ms is used
zookeeper.connection.timeout.ms=6000

#Zookeeper session timeout
zookeeper.session.timeout.ms=6000

#Set client to use secure ACLs
zookeeper.set.acl=false

#How far a ZK follower can be behind a ZK leader
zookeeper.sync.time.ms=2000


zookeeper.connect=10.105.212.11:2181,10.105.211.109:2181,10.105.210.103:2181
jvm.performance.opts=-javaagent:/opt/jolokia/jolokia-jvm-1.3.5-agent.jar=host=
port=9092
advertised.host.name=10.105.210.24
advertised.port=9092​

On Sun, Dec 18, 2016 at 5:35 PM, Tony Liu <ji...@zuora.com> wrote:

> when that error happened, I need to manually restart the kafka node
> `1002`, after restart finishing, all of the partition is being healthy
> again.
>
> i.e
> *before start ​:*
> 3 *1002*
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
> *(1002,1004,1005)* *(1002)* *true* *true*
>
> *​After start:*
> 3
> *​       ​1002*
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
> *(1002,1004,1005)*
> *(1002​, 1004, 1005​)* *true* *true*
> ​
>
>
> On Sun, Dec 18, 2016 at 5:29 PM, Tony Liu <ji...@zuora.com> wrote:
>
>> Hi,
>>
>> Recently, we ran into the `batch expired` error in several days, may be 3
>> or 5 days, there is not fixed frequency.
>>
>> *A,* the error is:
>> Exception Class : org.apache.kafka.common.errors.TimeoutException
>> Error Message : Batch Expired
>>
>> *B*: server.log from kafka :
>>
>> [2016-12-18 20:45:32,371] INFO  Partition [thl_raw,43] on broker 1002:
>> Shrinking ISR for partition [thl_raw,43] from 1006,1001,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,376] INFO  Partition [HeartBit,6] on broker 1002:
>> Shrinking ISR for partition [HeartBit,6] from 1005,1006,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,378] INFO  Partition [thl_raw,31] on broker 1002:
>> Shrinking ISR for partition [thl_raw,31] from 1005,1004,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,382] INFO  Partition [HeartBit,0] on broker 1002:
>> Shrinking ISR for partition [HeartBit,0] from 1004,1005,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,384] INFO  Partition [ConnectorSync,7] on broker
>> 1002: Shrinking ISR for partition [ConnectorSync,7] from 1001,1002,1003 to
>> 1002 (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,386] INFO  Partition [__consumer_offsets,8] on
>> broker 1002: Shrinking ISR for partition [__consumer_offsets,8] from
>> 1005,1004,1002 to 1002 (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,389] INFO  Partition [thl_raw,37] on broker 1002:
>> Shrinking ISR for partition [thl_raw,37] from 1005,1006,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 20:45:32,391] INFO  Partition [HeartBeat,3] on broker 1002:
>> Shrinking ISR for partition [HeartBeat,3] from 1005,1004,1002 to 1002
>> (kafka.cluster.Partition)
>> [2016-12-18 21:17:59,888] INFO  Rolled new log segment for
>> '__consumer_offsets-46' in 1 ms. (kafka.log.Log)
>> [2016-12-18 21:19:07,923] INFO  Deleting segment 0 from log
>> __consumer_offsets-46. (kafka.log.Log)
>> [2016-12-18 21:19:07,923] INFO  Deleting segment 101935860 from log
>> __consumer_offsets-46. (kafka.log.Log)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000000000000.index.deleted
>> (kafka.log.OffsetIndex)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000101935860.index.deleted
>> (kafka.log.OffsetIndex)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000000000000.timeindex.deleted
>> (kafka.log.TimeIndex)
>> [2016-12-18 21:19:07,924] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000101935860.timeindex.deleted
>> (kafka.log.TimeIndex)
>> [2016-12-18 21:19:08,393] INFO  Deleting segment 102963875 from log
>> __consumer_offsets-46. (kafka.log.Log)
>> [2016-12-18 21:19:08,410] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000102963875.index.deleted
>> (kafka.log.OffsetIndex)
>> [2016-12-18 21:19:08,410] INFO  Deleting index
>> /kafka/data/__consumer_offsets-46/00000000000102963875.timeindex.deleted
>> (kafka.log.TimeIndex)
>> [2016-12-18 21:48:53,007] INFO  Rolled new log segment for 'thl_raw-24'
>> in 1 ms. (kafka.log.Log)
>> [2016-12-18 22:15:09,894] INFO  Rolled new log segment for 'thl_raw-1' in
>> 0 ms. (kafka.log.Log)
>> [2016-12-18 23:34:28,526] INFO  Rolled new log segment for 'thl_raw-9' in
>> 1 ms. (kafka.log.Log)
>> [2016-12-18 23:34:28,754] INFO  Rolled new log segment for 'thl_raw-39'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-18 23:34:28,786] INFO  Rolled new log segment for 'thl_raw-7' in
>> 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:32,816] INFO  Rolled new log segment for 'thl_raw-15'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,049] INFO  Rolled new log segment for 'thl_raw-44'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,137] INFO  Rolled new log segment for 'thl_raw-20'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,305] INFO  Rolled new log segment for 'thl_raw-40'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,380] INFO  Rolled new log segment for 'thl_raw-59'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,470] INFO  Rolled new log segment for 'thl_raw-50'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,630] INFO  Rolled new log segment for 'thl_raw-35'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:33,995] INFO  Rolled new log segment for 'thl_raw-45'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,007] INFO  Rolled new log segment for 'thl_raw-34'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,265] INFO  Rolled new log segment for 'thl_raw-48'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,359] INFO  Rolled new log segment for 'thl_raw-54'
>> in 1 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,367] INFO  Rolled new log segment for 'thl_raw-10'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:34,540] INFO  Rolled new log segment for 'thl_raw-2' in
>> 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:35,123] INFO  Rolled new log segment for 'thl_raw-14'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:36,822] INFO  Rolled new log segment for 'thl_raw-29'
>> in 0 ms. (kafka.log.Log)
>> [2016-12-19 00:04:36,970] INFO  Rolled new log segment for 'thl_raw-18'
>> in 0 ms. (kafka.log.Log)
>>
>> *C*, when that kind of error happened, we always see the replication
>> being in problem, like:
>>
>> Topics
>> Topic# Partitions# BrokersBrokers Spread %Brokers Skew %# ReplicasUnder
>> Replicated %Producer Message/Sec
>> __consumer_offsets
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/__consumer_offsets>
>> 50 6 100 0 3 16 0.00
>> ConnectorSync
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/ConnectorSync>
>> 8 6 100 16 3 25 0.00
>> EventInstance
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/EventInstance>
>> 8 6 100 16 3 12 0.00
>> fjord_healthy_checker
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/fjord_healthy_checker>
>> 8 6 100 16 3 12 0.00
>> HeartBeat
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBeat>
>> 8 6 100 16 3 12 0.00
>> HeartBit
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBit>
>> 8 6 100 0 3 25 0.00
>> Notification
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/Notification>
>> 8 6 100 33 3 12 0.00
>> NotificationEventInstance
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/NotificationEventInstance>
>> 8 6 100 16 3 12 0.00
>> thl_raw
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/thl_raw>
>> 64 6 100 0 3 17 0.00
>> *D*, All of the replication sounds related with node '1002` (click into
>> the each of topic, all of the issued partitions having the similar like `*blue
>> highlight*` )
>> Partition Information
>> PartitionLatest OffsetLeaderReplicasIn Sync ReplicasPreferred Leader?Under
>> Replicated?
>> 0 1005
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>> (1005,1001,1002) (1005,1002,1001) true false
>> 1 1006
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>> (1006,1002,1003) (1006,1003,1002) true false
>> 2 1001
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1001>
>> (1001,1003,1004) (1004,1003,1001) true false
>> 3 *1002*
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
>> *(1002,1004,1005)* *(1002)* *true* *true*
>> 4 1003
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1003>
>> (1003,1005,1006) (1003,1006,1005) true false
>> 5 1004
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1004>
>> (1004,1006,1001) (1004,1001,1006) true false
>> 6 1005
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
>> (1005,1002,1003) (1003,1005,1002) true false
>> 7 1006
>> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
>> (1006,1003,1004) (1003,1006,1004) true false
>>
>
>

Re: Timeout publishing message to Kafka cluster.

Posted by Tony Liu <ji...@zuora.com>.
when that error happened, I need to manually restart the kafka node `1002`,
after restart finishing, all of the partition is being healthy again.

i.e
*before start ​:*
3 *1002*
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
*(1002,1004,1005)* *(1002)* *true* *true*

*​After start:*
3
*​       ​1002*
<http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
*(1002,1004,1005)*
*(1002​, 1004, 1005​)* *true* *true*
​


On Sun, Dec 18, 2016 at 5:29 PM, Tony Liu <ji...@zuora.com> wrote:

> Hi,
>
> Recently, we ran into the `batch expired` error in several days, may be 3
> or 5 days, there is not fixed frequency.
>
> *A,* the error is:
> Exception Class : org.apache.kafka.common.errors.TimeoutException
> Error Message : Batch Expired
>
> *B*: server.log from kafka :
>
> [2016-12-18 20:45:32,371] INFO  Partition [thl_raw,43] on broker 1002:
> Shrinking ISR for partition [thl_raw,43] from 1006,1001,1002 to 1002
> (kafka.cluster.Partition)
> [2016-12-18 20:45:32,376] INFO  Partition [HeartBit,6] on broker 1002:
> Shrinking ISR for partition [HeartBit,6] from 1005,1006,1002 to 1002
> (kafka.cluster.Partition)
> [2016-12-18 20:45:32,378] INFO  Partition [thl_raw,31] on broker 1002:
> Shrinking ISR for partition [thl_raw,31] from 1005,1004,1002 to 1002
> (kafka.cluster.Partition)
> [2016-12-18 20:45:32,382] INFO  Partition [HeartBit,0] on broker 1002:
> Shrinking ISR for partition [HeartBit,0] from 1004,1005,1002 to 1002
> (kafka.cluster.Partition)
> [2016-12-18 20:45:32,384] INFO  Partition [ConnectorSync,7] on broker
> 1002: Shrinking ISR for partition [ConnectorSync,7] from 1001,1002,1003 to
> 1002 (kafka.cluster.Partition)
> [2016-12-18 20:45:32,386] INFO  Partition [__consumer_offsets,8] on broker
> 1002: Shrinking ISR for partition [__consumer_offsets,8] from
> 1005,1004,1002 to 1002 (kafka.cluster.Partition)
> [2016-12-18 20:45:32,389] INFO  Partition [thl_raw,37] on broker 1002:
> Shrinking ISR for partition [thl_raw,37] from 1005,1006,1002 to 1002
> (kafka.cluster.Partition)
> [2016-12-18 20:45:32,391] INFO  Partition [HeartBeat,3] on broker 1002:
> Shrinking ISR for partition [HeartBeat,3] from 1005,1004,1002 to 1002
> (kafka.cluster.Partition)
> [2016-12-18 21:17:59,888] INFO  Rolled new log segment for
> '__consumer_offsets-46' in 1 ms. (kafka.log.Log)
> [2016-12-18 21:19:07,923] INFO  Deleting segment 0 from log
> __consumer_offsets-46. (kafka.log.Log)
> [2016-12-18 21:19:07,923] INFO  Deleting segment 101935860 from log
> __consumer_offsets-46. (kafka.log.Log)
> [2016-12-18 21:19:07,924] INFO  Deleting index /kafka/data/__consumer_
> offsets-46/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
> [2016-12-18 21:19:07,924] INFO  Deleting index /kafka/data/__consumer_
> offsets-46/00000000000101935860.index.deleted (kafka.log.OffsetIndex)
> [2016-12-18 21:19:07,924] INFO  Deleting index /kafka/data/__consumer_
> offsets-46/00000000000000000000.timeindex.deleted (kafka.log.TimeIndex)
> [2016-12-18 21:19:07,924] INFO  Deleting index /kafka/data/__consumer_
> offsets-46/00000000000101935860.timeindex.deleted (kafka.log.TimeIndex)
> [2016-12-18 21:19:08,393] INFO  Deleting segment 102963875 from log
> __consumer_offsets-46. (kafka.log.Log)
> [2016-12-18 21:19:08,410] INFO  Deleting index /kafka/data/__consumer_
> offsets-46/00000000000102963875.index.deleted (kafka.log.OffsetIndex)
> [2016-12-18 21:19:08,410] INFO  Deleting index /kafka/data/__consumer_
> offsets-46/00000000000102963875.timeindex.deleted (kafka.log.TimeIndex)
> [2016-12-18 21:48:53,007] INFO  Rolled new log segment for 'thl_raw-24' in
> 1 ms. (kafka.log.Log)
> [2016-12-18 22:15:09,894] INFO  Rolled new log segment for 'thl_raw-1' in
> 0 ms. (kafka.log.Log)
> [2016-12-18 23:34:28,526] INFO  Rolled new log segment for 'thl_raw-9' in
> 1 ms. (kafka.log.Log)
> [2016-12-18 23:34:28,754] INFO  Rolled new log segment for 'thl_raw-39' in
> 0 ms. (kafka.log.Log)
> [2016-12-18 23:34:28,786] INFO  Rolled new log segment for 'thl_raw-7' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:32,816] INFO  Rolled new log segment for 'thl_raw-15' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:33,049] INFO  Rolled new log segment for 'thl_raw-44' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:33,137] INFO  Rolled new log segment for 'thl_raw-20' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:33,305] INFO  Rolled new log segment for 'thl_raw-40' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:33,380] INFO  Rolled new log segment for 'thl_raw-59' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:33,470] INFO  Rolled new log segment for 'thl_raw-50' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:33,630] INFO  Rolled new log segment for 'thl_raw-35' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:33,995] INFO  Rolled new log segment for 'thl_raw-45' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:34,007] INFO  Rolled new log segment for 'thl_raw-34' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:34,265] INFO  Rolled new log segment for 'thl_raw-48' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:34,359] INFO  Rolled new log segment for 'thl_raw-54' in
> 1 ms. (kafka.log.Log)
> [2016-12-19 00:04:34,367] INFO  Rolled new log segment for 'thl_raw-10' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:34,540] INFO  Rolled new log segment for 'thl_raw-2' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:35,123] INFO  Rolled new log segment for 'thl_raw-14' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:36,822] INFO  Rolled new log segment for 'thl_raw-29' in
> 0 ms. (kafka.log.Log)
> [2016-12-19 00:04:36,970] INFO  Rolled new log segment for 'thl_raw-18' in
> 0 ms. (kafka.log.Log)
>
> *C*, when that kind of error happened, we always see the replication
> being in problem, like:
>
> Topics
> Topic# Partitions# BrokersBrokers Spread %Brokers Skew %# ReplicasUnder
> Replicated %Producer Message/Sec
> __consumer_offsets
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/__consumer_offsets>
> 50 6 100 0 3 16 0.00
> ConnectorSync
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/ConnectorSync>
> 8 6 100 16 3 25 0.00
> EventInstance
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/EventInstance>
> 8 6 100 16 3 12 0.00
> fjord_healthy_checker
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/fjord_healthy_checker>
> 8 6 100 16 3 12 0.00
> HeartBeat
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBeat>
> 8 6 100 16 3 12 0.00
> HeartBit
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/HeartBit>
> 8 6 100 0 3 25 0.00
> Notification
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/Notification>
> 8 6 100 33 3 12 0.00
> NotificationEventInstance
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/NotificationEventInstance>
> 8 6 100 16 3 12 0.00
> thl_raw
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/topics/thl_raw>
> 64 6 100 0 3 17 0.00
> *D*, All of the replication sounds related with node '1002` (click into
> the each of topic, all of the issued partitions having the similar like `*blue
> highlight*` )
> Partition Information
> PartitionLatest OffsetLeaderReplicasIn Sync ReplicasPreferred Leader?Under
> Replicated?
> 0 1005
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
> (1005,1001,1002) (1005,1002,1001) true false
> 1 1006
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
> (1006,1002,1003) (1006,1003,1002) true false
> 2 1001
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1001>
> (1001,1003,1004) (1004,1003,1001) true false
> 3 *1002*
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1002>
> *(1002,1004,1005)* *(1002)* *true* *true*
> 4 1003
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1003>
> (1003,1005,1006) (1003,1006,1005) true false
> 5 1004
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1004>
> (1004,1006,1001) (1004,1001,1006) true false
> 6 1005
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1005>
> (1005,1002,1003) (1003,1005,1002) true false
> 7 1006
> <http://fjord-staging2-kafka-manager.infradev.zuora.com:9000/clusters/fjord-staging2/brokers/1006>
> (1006,1003,1004) (1003,1006,1004) true false
>