You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alessandro Tagliapietra <ta...@gmail.com> on 2021/04/10 04:02:06 UTC

Confluent replicator keeps restarting consumers

Hi everyone,

I'm trying to migrate from a cluster to another hosted on confluent cloud,
I'm using the trial version or confluent replicator and it seems that it
keeps restarting its consumers.
I know that the replicator isn't within kafka but I think that the error
might just be related to any consumer not just the one used by the
replicator.
I've created a consumer.properties file like this:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=xxx.eu-west-1.aws.confluent.cloud:9092
retry.backoff.ms=500
offset.flush.timeout.ms=300000
max.poll.interval.ms=300000
max.poll.records=250
group.instance.id=replicator-0
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="xxx" password="xxx";


and a producer.properties like this:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=xxx.eu-west-1.aws.confluent.cloud:9092
offset.flush.timeout.ms=50000
buffer.memory=335544
max.poll.interval.ms=300000
max.poll.records=250
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="xxx" password="xxx";


(I've tried to tune the properties you see between bootstrap.servers and
sals.jaas.config a bit but the behavior doesn't change)

I run the command like this:

confluent-platform/bin/replicator --cluster.id replicator --consumer.config
consumer.properties --producer.config producer.properties --topic.regex '.*'

and what I see is these sections of logs continuously:

[2021-04-09 20:39:08,967] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        bootstrap.servers = [xxx.eu-west-1.aws.confluent.cloud:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = replicator-0
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = replicator
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 250
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = PLAIN
        security.protocol = SASL_SSL
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:361)
[2021-04-09 20:39:08,969] WARN The configuration 'offset.flush.timeout.ms'
was supplied but isn't a known config.
(org.apache.kafka.clients.consumer.ConsumerConfig:369)
[2021-04-09 20:39:08,969] WARN The configuration 'buffer.memory' was
supplied but isn't a known config.
(org.apache.kafka.clients.consumer.ConsumerConfig:369)
[2021-04-09 20:39:08,969] INFO Kafka version: 6.1.1-ce
(org.apache.kafka.common.utils.AppInfoParser:119)
[2021-04-09 20:39:08,969] INFO Kafka commitId: bed3428d56b4e9cb
(org.apache.kafka.common.utils.AppInfoParser:120)
[2021-04-09 20:39:08,969] INFO Kafka startTimeMs: 1618025948969
(org.apache.kafka.common.utils.AppInfoParser:121)
[2021-04-09 20:39:10,219] INFO [Consumer clientId=replicator-0,
groupId=replicator] Cluster ID: lkc-77d82
(org.apache.kafka.clients.Metadata:279)
[2021-04-09 20:39:13,209] INFO [Consumer clientId=replicator-0,
groupId=replicator] Discovered group coordinator
b10-xxx.eu-west-1.aws.confluent.cloud:9092 (id: 2147483637 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:844)
[2021-04-09 20:39:14,572] ERROR [Consumer clientId=replicator-0,
groupId=replicator] Offset commit failed on partition orders-output-0 at
offset 2163442: The coordinator is not aware of this member.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1190)
[2021-04-09 20:39:14,572] INFO [Consumer clientId=replicator-0,
groupId=replicator] OffsetCommit failed with Generation{generationId=-1,
memberId='', protocol='null'}: The coordinator is not aware of this member.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1246)
[2021-04-09 20:39:14,572] WARN Could not translate offsets for group ID
replicator.  There may be an active consumer group for this ID in the
destination cluster. Stop the consumer group in order for offset
translation to continue.
(io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator:293)
[2021-04-09 20:39:14,572] INFO Metrics scheduler closed
(org.apache.kafka.common.metrics.Metrics:668)
[2021-04-09 20:39:14,572] INFO Closing reporter
org.apache.kafka.common.metrics.JmxReporter
(org.apache.kafka.common.metrics.Metrics:672)
[2021-04-09 20:39:14,573] INFO Metrics reporters closed
(org.apache.kafka.common.metrics.Metrics:678)
[2021-04-09 20:39:14,574] INFO App info kafka.consumer for replicator-0
unregistered (org.apache.kafka.common.utils.AppInfoParser:83)


Not sure if it matters but both consumer and producer use the same
bootstrap server however I think that the clusters are separated based on
the login.
Anything I can try to solve this? Because it seems that the replicator is
moving messages but the rate is < 1MB/s so it's probably going very slow
compared to what it should.

Thank you in advance

--
Alessandro Tagliapietra