You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Christian Bates (Jira)" <ji...@apache.org> on 2022/05/05 15:18:00 UTC

[jira] [Created] (KAFKA-13876) Mirrormaker-2 consumer settings not working

Christian Bates created KAFKA-13876:
---------------------------------------

             Summary: Mirrormaker-2 consumer settings not working
                 Key: KAFKA-13876
                 URL: https://issues.apache.org/jira/browse/KAFKA-13876
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.6.1, 2.5.0
            Reporter: Christian Bates


I am starting a mirrormaker connect cluster using ./bin/connect-mirror-maker.sh and a properties file.

I followed the guide at https://github.com/apache/kafka/tree/trunk/connect/mirror to attempt to configure the consumer properties, however, no matter what I set, the timeout for the consumer remains fixed at the default, confirmed both by kafka outputting its config in the log and by timing how long between disconnect messages I get.  e.g. I set:

{{CLOUD_EU.consumer.request.timeout.ms=120000}}

In the properties that I start MM-2 with and it is ignored.  

 

based on various guides I have found while looking at this, I have also tried

{{CLOUD_EU.request.timeout.ms=120000}}
{{CLOUD_EU.cluster.consumer.request.timeout.ms=120000}}
{{CLOUD_EU.consumer.override.request.timeout.ms=120000}}
{{CLOUD_EU.cluster.consumer.override.request.timeout.ms=120000}}

None of which have worked.  

How can I change the consumer request.timeout setting?

 

Context:

I am attempting to use mirrormaker 2 to replicate data between AWS Managed Kafkas (MSK) in 2 different AWS regions - one in eu-west-1 (CLOUD_EU) and the other in us-west-2 (CLOUD_NA), both running Kafka 2.6.1.  For testing I am currently trying just to replicate topics 1 way, from EU -> NA.

This works fine for topics with small messages on them, but one of my topic has binary messages up to 20MB in size.  When I try to replicate that topic I get an error every 30 seconds 

{{[2022-04-21 13:47:05,268] INFO [Consumer clientId=consumer-29, groupId=null] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: {}. (org.apache.kafka.clients.FetchSessionHandler:481)}}
{{org.apache.kafka.common.errors.DisconnectException}}

 

When logging in DEBUG to get more information we get

{{[2022-04-21 13:47:05,267] DEBUG [Consumer clientId=consumer-29, groupId=null] Disconnecting from node 2 due to request timeout. (org.apache.kafka.clients.NetworkClient:784)}}
{{[2022-04-21 13:47:05,268] DEBUG [Consumer clientId=consumer-29, groupId=null] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-29, correlationId=35) due to node 2 being disconnected (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:593)}}

It gets stuck in a loop constantly disconnecting with request timeout every 30s and then trying again.

Looking at this, I suspected that the problem is the `request.timeout.ms` was on the default (30s) and timing out trying to read the topic with many large messages, hence I was trying to override the consumer settings to have a longer timeout.

 

Properties file I am using to start the cluster:

 

# specify any number of cluster aliases
clusters = CLOUD_EU, CLOUD_NA

# connection information for each cluster
CLOUD_EU.bootstrap.servers = kafka.eu-west-1.amazonaws.com:9092
CLOUD_NA.bootstrap.servers = kafka.us-west-2.amazonaws.com:9092

# enable and configure individual replication flows
CLOUD_EU->CLOUD_NA.enabled = true
CLOUD_EU->CLOUD_NA.topics = METRICS_ATTACHMENTS_OVERSIZE_EU
CLOUD_NA->CLOUD_EU.enabled = false

replication.factor=3
tasks.max = 1

############################# Internal Topic Settings  #############################
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3

############################    Kafka Settings    ###################################

# CLOUD_EU cluster over writes
CLOUD_EU.consumer.request.timeout.ms=120000
CLOUD_EU.consumer.session.timeout.ms=150000



--
This message was sent by Atlassian Jira
(v8.20.7#820007)