You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (Jira)" <ji...@apache.org> on 2020/05/11 23:53:00 UTC

[jira] [Commented] (KAFKA-9669) Kafka 2.4.0 Chokes on Filebeat 5.6 Produced Data

    [ https://issues.apache.org/jira/browse/KAFKA-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104979#comment-17104979 ] 

Jason Gustafson commented on KAFKA-9669:
----------------------------------------

I think this is the result of stricter validation on the broker from KAFKA-8106. The "inner" offsets in v1 messages are expected to increase sequentially, but previously we would silently ignore this and rewrite the batch. In 2.4, this become a validation error, which broke compatibility for older clients.

> Kafka 2.4.0 Chokes on Filebeat 5.6 Produced Data
> ------------------------------------------------
>
>                 Key: KAFKA-9669
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9669
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: Weichu Liu
>            Assignee: Jason Gustafson
>            Priority: Major
>
> Hi
> In our environment, after upgrading to Kafka 2.4.0, we discovered the broker was not compatible with filebeat 5.
> Here is how to reproduce:
> 1. Startup Kafka 2.4.0, all configurations are vanilla:
> {code}
> $ kafka_2.13-2.4.0/bin/zookeeper-server-start.sh kafka_2.13-2.4.0/config/zookeeper.properties
> $ kafka_2.13-2.4.0/bin/kafka-server-start.sh kafka_2.13-2.4.0/config/server.properties
> {code}
> 2. Startup filebeat 5.6.16 with the following configuration. (downloaded from https://www.elastic.co/jp/downloads/past-releases/filebeat-5-6-16)
> {code}
> $ cat /tmp/filebeat.yml
> name: test
> output.kafka:
>   enabled: true
>   hosts:
>     - localhost:9092
>   topic: test-3
>   version: 0.10.0
>   compression: gzip
> filebeat:
>   prospectors:
>     - input_type: log
>       paths:
>         - /tmp/filebeat-in
>       encoding: plain
> {code}
> {code}
> $ filebeat-5.6.16-linux-x86_64/filebeat -e -c /tmp/filebeat.yml
> {code}
> 3. Write some lines to file {{/tmp/filebeat-in}}. Looks like single line won't trigger the issue, but 30 lines are enough.
> {code}
> seq 30 >> /tmp/filebeat-in
> {code}
> 4. Kafka throws the following error chunk, like, per produced record.
> {noformat}
> [2020-03-06 05:17:40,129] ERROR [ReplicaManager broker=0] Error processing append operation on partition test-3-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: Inner record LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, crc=1453875406, CreateTime=1583471854475, key=0 bytes, value=202 bytes)) inside the compressed record batch does not have incremental offsets, expected offset is 1 in topic partition test-3-0.
> [2020-03-06 05:17:40,129] ERROR [KafkaApi-0] Error when handling request: clientId=beats, correlationId=102, api=PRODUCE, version=2, body={acks=1,timeout=10000,partitionSizes=[test-3-0=272]} (kafka.server.KafkaApis)
> java.lang.NullPointerException: `field` must be non-null
> 	at java.base/java.util.Objects.requireNonNull(Objects.java:246)
> 	at org.apache.kafka.common.protocol.types.Struct.validateField(Struct.java:474)
> 	at org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:418)
> 	at org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:436)
> 	at org.apache.kafka.common.requests.ProduceResponse.toStruct(ProduceResponse.java:281)
> 	at org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:35)
> 	at org.apache.kafka.common.requests.RequestContext.buildResponse(RequestContext.java:80)
> 	at kafka.server.KafkaApis.sendResponse(KafkaApis.scala:2892)
> 	at kafka.server.KafkaApis.sendResponseCallback$2(KafkaApis.scala:554)
> 	at kafka.server.KafkaApis.$anonfun$handleProduceRequest$11(KafkaApis.scala:576)
> 	at kafka.server.KafkaApis.$anonfun$handleProduceRequest$11$adapted(KafkaApis.scala:576)
> 	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:546)
> 	at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:577)
> 	at kafka.server.KafkaApis.handle(KafkaApis.scala:126)
> 	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
> 	at java.base/java.lang.Thread.run(Thread.java:835)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)