You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alessandro Tagliapietra <ta...@gmail.com> on 2021/04/10 04:02:06 UTC
Confluent replicator keeps restarting consumers
Hi everyone,
I'm trying to migrate from a cluster to another hosted on confluent cloud,
I'm using the trial version or confluent replicator and it seems that it
keeps restarting its consumers.
I know that the replicator isn't within kafka but I think that the error
might just be related to any consumer not just the one used by the
replicator.
I've created a consumer.properties file like this:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=xxx.eu-west-1.aws.confluent.cloud:9092
retry.backoff.ms=500
offset.flush.timeout.ms=300000
max.poll.interval.ms=300000
max.poll.records=250
group.instance.id=replicator-0
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="xxx" password="xxx";
and a producer.properties like this:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=xxx.eu-west-1.aws.confluent.cloud:9092
offset.flush.timeout.ms=50000
buffer.memory=335544
max.poll.interval.ms=300000
max.poll.records=250
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="xxx" password="xxx";
(I've tried to tune the properties you see between bootstrap.servers and
sals.jaas.config a bit but the behavior doesn't change)
I run the command like this:
confluent-platform/bin/replicator --cluster.id replicator --consumer.config
consumer.properties --producer.config producer.properties --topic.regex '.*'
and what I see is these sections of logs continuously:
[2021-04-09 20:39:08,967] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [xxx.eu-west-1.aws.confluent.cloud:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = replicator-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = replicator
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 250
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:361)
[2021-04-09 20:39:08,969] WARN The configuration 'offset.flush.timeout.ms'
was supplied but isn't a known config.
(org.apache.kafka.clients.consumer.ConsumerConfig:369)
[2021-04-09 20:39:08,969] WARN The configuration 'buffer.memory' was
supplied but isn't a known config.
(org.apache.kafka.clients.consumer.ConsumerConfig:369)
[2021-04-09 20:39:08,969] INFO Kafka version: 6.1.1-ce
(org.apache.kafka.common.utils.AppInfoParser:119)
[2021-04-09 20:39:08,969] INFO Kafka commitId: bed3428d56b4e9cb
(org.apache.kafka.common.utils.AppInfoParser:120)
[2021-04-09 20:39:08,969] INFO Kafka startTimeMs: 1618025948969
(org.apache.kafka.common.utils.AppInfoParser:121)
[2021-04-09 20:39:10,219] INFO [Consumer clientId=replicator-0,
groupId=replicator] Cluster ID: lkc-77d82
(org.apache.kafka.clients.Metadata:279)
[2021-04-09 20:39:13,209] INFO [Consumer clientId=replicator-0,
groupId=replicator] Discovered group coordinator
b10-xxx.eu-west-1.aws.confluent.cloud:9092 (id: 2147483637 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:844)
[2021-04-09 20:39:14,572] ERROR [Consumer clientId=replicator-0,
groupId=replicator] Offset commit failed on partition orders-output-0 at
offset 2163442: The coordinator is not aware of this member.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1190)
[2021-04-09 20:39:14,572] INFO [Consumer clientId=replicator-0,
groupId=replicator] OffsetCommit failed with Generation{generationId=-1,
memberId='', protocol='null'}: The coordinator is not aware of this member.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1246)
[2021-04-09 20:39:14,572] WARN Could not translate offsets for group ID
replicator. There may be an active consumer group for this ID in the
destination cluster. Stop the consumer group in order for offset
translation to continue.
(io.confluent.connect.replicator.offsets.ConsumerOffsetsTranslator:293)
[2021-04-09 20:39:14,572] INFO Metrics scheduler closed
(org.apache.kafka.common.metrics.Metrics:668)
[2021-04-09 20:39:14,572] INFO Closing reporter
org.apache.kafka.common.metrics.JmxReporter
(org.apache.kafka.common.metrics.Metrics:672)
[2021-04-09 20:39:14,573] INFO Metrics reporters closed
(org.apache.kafka.common.metrics.Metrics:678)
[2021-04-09 20:39:14,574] INFO App info kafka.consumer for replicator-0
unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
Not sure if it matters but both consumer and producer use the same
bootstrap server however I think that the clusters are separated based on
the login.
Anything I can try to solve this? Because it seems that the replicator is
moving messages but the rate is < 1MB/s so it's probably going very slow
compared to what it should.
Thank you in advance
--
Alessandro Tagliapietra