You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andrey Polyakov (Jira)" <ji...@apache.org> on 2021/05/06 23:19:00 UTC

[jira] [Created] (KAFKA-12759) Kafka consumers with static group membership won't consume from newly subscribed topics

Andrey Polyakov created KAFKA-12759:
---------------------------------------

             Summary: Kafka consumers with static group membership won't consume from newly subscribed topics
                 Key: KAFKA-12759
                 URL: https://issues.apache.org/jira/browse/KAFKA-12759
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.8.0
            Reporter: Andrey Polyakov


We've recently started using static group membership and noticed that when adding a new topic to the subscription, it's not consumed from, regardless of how long the consumer is left to run. A workaround we have is shutting down all consumers in the group for longer than session.timeout.ms, then starting them back up. Is this expected behaviour or a bug?

Sample application:
{code:java}
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

public class Test {
  static volatile boolean shutdown = false;
  static final Object shutdownLock = new Object();

  public static void main(String[] args) {
    Runtime.getRuntime()
        .addShutdownHook(
            new Thread(
                () -> {
                  shutdown = true;
                  synchronized (shutdownLock) {
                    try {
                      shutdownLock.wait();
                    } catch (InterruptedException e) {
                      e.printStackTrace();
                    }
                  }
                }));

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getCanonicalName());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getCanonicalName());

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 min
    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance1");

    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList("topic1"));
    // consumer.subscribe(Arrays.asList("topic1", "topic2"));

    while (!shutdown) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(5));
      System.out.println("poll() returned " + records.count() + " records");
    }

    System.out.println("Closing consumer");
    consumer.close();
    synchronized (shutdownLock) {
      shutdownLock.notifyAll();
      System.out.println("Done closing consumer");
    }
  }
}
{code}
Steps to reproduce:
 0. update bootstrap server config in example code
 1. run above application, which consumes from topic1
 2. send SIGTERM to process, cleaning closing the consumer
 3. modify code to consume from topic1 AND topic2
 4. run application again, and see that both topics appear in the logs as being part of the subscription, but they're never assigned, regardless of how long you let the consumer run.

Logs from first run (1 topic subscription):
{code:java}
ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-myGroupID-instance1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = myGroupID
	group.instance.id = instance1
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 300000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Kafka version: 2.8.0
Kafka commitId: ebb1d6e21cc92130
Kafka startTimeMs: 1620342287841
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null)
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Finished assignment for group at generation 1: {instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03=Assignment(partitions=[topic1-0])}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0])
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Found no committed offset for partition topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Resetting offset for partition topic1-0 to position FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}}.
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
Closing consumer
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0
Metrics scheduler closed
Closing reporter org.apache.kafka.common.metrics.JmxReporter
Metrics reporters closed
App info kafka.consumer for consumer-myGroupID-instance1 unregistered
Done closing consumer

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
{code}
Logs from second run (2 topic subscription):
{code:java}
ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-myGroupID-instance1
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = myGroupID
	group.instance.id = instance1
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 300000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Kafka version: 2.8.0
Kafka commitId: ebb1d6e21cc92130
Kafka startTimeMs: 1620342351702
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Subscribed to topic(s): topic1, topic2
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 rack: null)
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] (Re-)joining group
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully joined group with generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Successfully synced group in generation Generation{generationId=1, memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Notifying assignor about the new Assignment(partitions=[topic1-0])
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Adding newly assigned partitions: topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Setting offset for partition topic1-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: null)], epoch=0}}
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
Closing consumer
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, groupId=myGroupID] Revoke previously assigned partitions topic1-0
Metrics scheduler closed
Closing reporter org.apache.kafka.common.metrics.JmxReporter
Metrics reporters closed
App info kafka.consumer for consumer-myGroupID-instance1 unregistered
Done closing consumer

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)