You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andrey Polyakov (Jira)" <ji...@apache.org> on 2021/05/06 23:19:00 UTC
[jira] [Created] (KAFKA-12759) Kafka consumers with static group
membership won't consume from newly subscribed topics
Andrey Polyakov created KAFKA-12759:
---------------------------------------
Summary: Kafka consumers with static group membership won't consume from newly subscribed topics
Key: KAFKA-12759
URL: https://issues.apache.org/jira/browse/KAFKA-12759
Project: Kafka
Issue Type: Bug
Affects Versions: 2.8.0
Reporter: Andrey Polyakov
We've recently started using static group membership and noticed that when adding a new topic to the subscription, it's not consumed from, regardless of how long the consumer is left to run. A workaround we have is shutting down all consumers in the group for longer than session.timeout.ms, then starting them back up. Is this expected behaviour or a bug?
Sample application:
{code:java}
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
public class Test {
static volatile boolean shutdown = false;
static final Object shutdownLock = new Object();
public static void main(String[] args) {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
shutdown = true;
synchronized (shutdownLock) {
try {
shutdownLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}));
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getCanonicalName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 min
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance1");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
// consumer.subscribe(Arrays.asList("topic1", "topic2"));
while (!shutdown) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(5));
System.out.println("poll() returned " + records.count() + " records");
}
System.out.println("Closing consumer");
consumer.close();
synchronized (shutdownLock) {
shutdownLock.notifyAll();
System.out.println("Done closing consumer");
}
}
}
{code}
Steps to reproduce:
0. update bootstrap server config in example code
1. run above application, which consumes from topic1
2. send SIGTERM to process, cleaning closing the consumer
3. modify code to consume from topic1 AND topic2
4. run application again, and see that both topics appear in the logs as being part of the subscription, but they're never assigned, regardless of how long you let the consumer run.
Logs from first run (1 topic subscription):
{code:java}
ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-myGroupID-instance1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = myGroupID
group.instance.id = instance1
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 300000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Kafka version: 2.8.0
Kafka commitId: ebb1d6e21cc92130
Kafka startTimeMs: 1620342287841
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null)
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Finished assignment for group at generation 1: {instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03=Assignment(partitions=[topic1-0])}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0])
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Found no committed offset for partition topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Resetting offset for partition topic1-0 to position FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}}.
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
Closing consumer
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0
Metrics scheduler closed
Closing reporter org.apache.kafka.common.metrics.JmxReporter
Metrics reporters closed
App info kafka.consumer for consumer-myGroupID-instance1 unregistered
Done closing consumer
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
{code}
Logs from second run (2 topic subscription):
{code:java}
ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-myGroupID-instance1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = myGroupID
group.instance.id = instance1
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 300000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Kafka version: 2.8.0
Kafka commitId: ebb1d6e21cc92130
Kafka startTimeMs: 1620342351702
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1, topic2
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null)
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0])
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Setting offset for partition topic1-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}}
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
Closing consumer
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0
Metrics scheduler closed
Closing reporter org.apache.kafka.common.metrics.JmxReporter
Metrics reporters closed
App info kafka.consumer for consumer-myGroupID-instance1 unregistered
Done closing consumer
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)