You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/11/15 13:00:47 UTC
(kafka) branch trunk updated: KAFKA-15277: Design & implement support for internal Consumer delegates (#14670)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22f7ffe5e16 KAFKA-15277: Design & implement support for internal Consumer delegates (#14670)
22f7ffe5e16 is described below
commit 22f7ffe5e1623d279096b45ab475768eeb05eee1
Author: Kirk True <ki...@kirktrue.pro>
AuthorDate: Wed Nov 15 05:00:40 2023 -0800
KAFKA-15277: Design & implement support for internal Consumer delegates (#14670)
The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface:
* `KafkaConsumer` (AKA "existing", "legacy" consumer)
* `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)
The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first:
* `LegacyKafkaConsumer`
* `AsyncKafkaConsumer`
`LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use.
This task is part of the work to implement support for the new KIP-848 consumer group protocol.
Reviewers: Philip Nee <pn...@confluent.io>, Andrew Schofield <as...@confluent.io>, David Jacot <dj...@confluent.io>
---
.../kafka/clients/consumer/ConsumerConfig.java | 13 +-
.../kafka/clients/consumer/KafkaConsumer.java | 971 ++-------------
...eAsyncConsumer.java => AsyncKafkaConsumer.java} | 203 ++--
.../consumer/internals/CommitRequestManager.java | 3 +-
.../consumer/internals/ConsumerDelegate.java | 42 +
.../internals/ConsumerDelegateCreator.java | 115 ++
.../consumer/internals/ConsumerNetworkThread.java | 10 +-
.../clients/consumer/internals/ConsumerUtils.java | 6 +
.../consumer/internals/LegacyKafkaConsumer.java | 1257 ++++++++++++++++++++
.../kafka/clients/consumer/ConsumerConfigTest.java | 31 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 1211 +++++++++++--------
...nsumerTest.java => AsyncKafkaConsumerTest.java} | 8 +-
.../internals/CommitRequestManagerTest.java | 6 +
.../consumer/internals/ConsumerTestBuilder.java | 8 +-
.../kafka/api/BaseAsyncConsumerTest.scala | 10 +-
.../kafka/api/IntegrationTestHarness.scala | 14 -
gradle/spotbugs-exclude.xml | 10 +
.../integration/TaskAssignorIntegrationTest.java | 11 +-
18 files changed, 2434 insertions(+), 1495 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 43bd2eb1741..213fa3ee52b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -662,6 +662,7 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.warnDisablingExponentialBackoff(this);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideClientId(refinedConfigs);
+ maybeOverrideEnableAutoCommit(refinedConfigs);
return refinedConfigs;
}
@@ -695,17 +696,17 @@ public class ConsumerConfig extends AbstractConfig {
return newConfigs;
}
- boolean maybeOverrideEnableAutoCommit() {
+ private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) {
Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
- boolean enableAutoCommit = getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ Map<String, Object> originals = originals();
+ boolean enableAutoCommit = originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;
if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided
- if (!originals().containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
- enableAutoCommit = false;
+ if (!originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
+ configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
} else if (enableAutoCommit) {
- throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
+ throw new InvalidConfigurationException(ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
}
}
- return enableAutoCommit;
}
public ConsumerConfig(Properties props) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bd795e033ab..82e1f8b93da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,26 +16,12 @@
*/
package org.apache.kafka.clients.consumer;
-import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.GroupRebalanceConfig;
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
-import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
+import org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.clients.consumer.internals.Deserializers;
-import org.apache.kafka.clients.consumer.internals.Fetch;
-import org.apache.kafka.clients.consumer.internals.FetchConfig;
-import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
-import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
-import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.consumer.internals.TopicMetadataFetcher;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -43,52 +29,23 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.InvalidGroupIdException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.slf4j.Logger;
-import org.slf4j.event.Level;
-import java.net.InetSocketAddress;
import java.time.Duration;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
-import static org.apache.kafka.common.utils.Utils.closeQuietly;
-import static org.apache.kafka.common.utils.Utils.isBlank;
-import static org.apache.kafka.common.utils.Utils.join;
import static org.apache.kafka.common.utils.Utils.propsToMap;
-import static org.apache.kafka.common.utils.Utils.swallow;
/**
* A client that consumes records from a Kafka cluster.
@@ -468,8 +425,7 @@ import static org.apache.kafka.common.utils.Utils.swallow;
*
* <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
*
- * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
- * making the call. It is the responsibility of the user to ensure that multi-threaded access
+ * The Kafka consumer is NOT thread-safe. It is the responsibility of the user to ensure that multi-threaded access
* is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
*
* <p>
@@ -567,43 +523,9 @@ import static org.apache.kafka.common.utils.Utils.swallow;
*/
public class KafkaConsumer<K, V> implements Consumer<K, V> {
- private static final long NO_CURRENT_THREAD = -1L;
- static final String DEFAULT_REASON = "rebalance enforced by user";
-
- // Visible for testing
- final Metrics metrics;
- final KafkaConsumerMetrics kafkaConsumerMetrics;
-
- private Logger log;
- private final String clientId;
- private final Optional<String> groupId;
- private final ConsumerCoordinator coordinator;
- private final Deserializers<K, V> deserializers;
- private final Fetcher<K, V> fetcher;
- private final OffsetFetcher offsetFetcher;
- private final TopicMetadataFetcher topicMetadataFetcher;
- private final ConsumerInterceptors<K, V> interceptors;
- private final IsolationLevel isolationLevel;
-
- private final Time time;
- private final ConsumerNetworkClient client;
- private final SubscriptionState subscriptions;
- private final ConsumerMetadata metadata;
- private final long retryBackoffMs;
- private final long retryBackoffMaxMs;
- private final long requestTimeoutMs;
- private final int defaultApiTimeoutMs;
- private volatile boolean closed = false;
- private final List<ConsumerPartitionAssignor> assignors;
-
- // currentThread holds the threadId of the current thread accessing KafkaConsumer
- // and is used to prevent multi-threaded access
- private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
- // refcount is used to allow reentrant access by the thread who has acquired currentThread
- private final AtomicInteger refcount = new AtomicInteger(0);
-
- // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
- private boolean cachedSubscriptionHasAllFetchPositions;
+ private final static ConsumerDelegateCreator CREATOR = new ConsumerDelegateCreator();
+
+ private final ConsumerDelegate<K, V> delegate;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -674,165 +596,30 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
keyDeserializer, valueDeserializer);
}
- @SuppressWarnings("unchecked")
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
- try {
- GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
- GroupRebalanceConfig.ProtocolType.CONSUMER);
-
- this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
- this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
- LogContext logContext = createLogContext(config, groupRebalanceConfig);
- this.log = logContext.logger(getClass());
- boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
- groupId.ifPresent(groupIdStr -> {
- if (groupIdStr.isEmpty()) {
- log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
- }
- });
-
- log.debug("Initializing the Kafka consumer");
- this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
- this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
- this.time = Time.SYSTEM;
- this.metrics = createMetrics(config, time);
- this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
- this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-
- List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
- this.interceptors = new ConsumerInterceptors<>(interceptorList);
- this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
- this.subscriptions = createSubscriptionState(config, logContext);
- ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
- metrics.reporters(),
- interceptorList,
- Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
- this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
- this.metadata.bootstrap(addresses);
-
- FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
- FetchConfig fetchConfig = new FetchConfig(config);
- this.isolationLevel = fetchConfig.isolationLevel;
-
- ApiVersions apiVersions = new ApiVersions();
- this.client = createConsumerNetworkClient(config,
- metrics,
- logContext,
- apiVersions,
- time,
- metadata,
- fetchMetricsManager.throttleTimeSensor(),
- retryBackoffMs);
-
- this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
- config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
- config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
- );
-
- // no coordinator will be constructed for the default (null) group id
- if (!groupId.isPresent()) {
- config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
- config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
- this.coordinator = null;
- } else {
- this.coordinator = new ConsumerCoordinator(groupRebalanceConfig,
- logContext,
- this.client,
- assignors,
- this.metadata,
- this.subscriptions,
- metrics,
- CONSUMER_METRIC_GROUP_PREFIX,
- this.time,
- enableAutoCommit,
- config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
- this.interceptors,
- config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
- config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
- }
- this.fetcher = new Fetcher<>(
- logContext,
- this.client,
- this.metadata,
- this.subscriptions,
- fetchConfig,
- this.deserializers,
- fetchMetricsManager,
- this.time,
- apiVersions);
- this.offsetFetcher = new OffsetFetcher(logContext,
- client,
- metadata,
- subscriptions,
- time,
- retryBackoffMs,
- requestTimeoutMs,
- isolationLevel,
- apiVersions);
- this.topicMetadataFetcher = new TopicMetadataFetcher(logContext,
- client,
- retryBackoffMs,
- retryBackoffMaxMs);
-
- this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
-
- config.logUnused();
- AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics, time.milliseconds());
- log.debug("Kafka consumer initialized");
- } catch (Throwable t) {
- // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
- // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
- if (this.log != null) {
- close(Duration.ZERO, true);
- }
- // now propagate the exception
- throw new KafkaException("Failed to construct kafka consumer", t);
- }
+ delegate = CREATOR.create(config, keyDeserializer, valueDeserializer);
}
- // visible for testing
KafkaConsumer(LogContext logContext,
- String clientId,
- ConsumerCoordinator coordinator,
+ Time time,
+ ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
- Fetcher<K, V> fetcher,
- OffsetFetcher offsetFetcher,
- TopicMetadataFetcher topicMetadataFetcher,
- ConsumerInterceptors<K, V> interceptors,
- Time time,
- ConsumerNetworkClient client,
- Metrics metrics,
+ KafkaClient client,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
- long retryBackoffMs,
- long retryBackoffMaxMs,
- long requestTimeoutMs,
- int defaultApiTimeoutMs,
- List<ConsumerPartitionAssignor> assignors,
- String groupId) {
- this.log = logContext.logger(getClass());
- this.clientId = clientId;
- this.coordinator = coordinator;
- this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
- this.fetcher = fetcher;
- this.offsetFetcher = offsetFetcher;
- this.topicMetadataFetcher = topicMetadataFetcher;
- this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
- this.interceptors = Objects.requireNonNull(interceptors);
- this.time = time;
- this.client = client;
- this.metrics = metrics;
- this.subscriptions = subscriptions;
- this.metadata = metadata;
- this.retryBackoffMs = retryBackoffMs;
- this.retryBackoffMaxMs = retryBackoffMaxMs;
- this.requestTimeoutMs = requestTimeoutMs;
- this.defaultApiTimeoutMs = defaultApiTimeoutMs;
- this.assignors = assignors;
- this.groupId = Optional.ofNullable(groupId);
- this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
+ List<ConsumerPartitionAssignor> assignors) {
+ delegate = CREATOR.create(
+ logContext,
+ time,
+ config,
+ keyDeserializer,
+ valueDeserializer,
+ client,
+ subscriptions,
+ metadata,
+ assignors
+ );
}
/**
@@ -844,12 +631,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The set of partitions currently assigned to this consumer
*/
public Set<TopicPartition> assignment() {
- acquireAndEnsureOpen();
- try {
- return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
- } finally {
- release();
- }
+ return delegate.assignment();
}
/**
@@ -858,12 +640,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The set of topics currently subscribed to
*/
public Set<String> subscription() {
- acquireAndEnsureOpen();
- try {
- return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
- } finally {
- release();
- }
+ return delegate.subscription();
}
/**
@@ -903,10 +680,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
- if (listener == null)
- throw new IllegalArgumentException("RebalanceListener cannot be null");
-
- subscribe(topics, Optional.of(listener));
+ delegate.subscribe(topics, listener);
}
/**
@@ -932,63 +706,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Collection<String> topics) {
- subscribe(topics, Optional.empty());
- }
-
- /**
- * Internal helper method for {@link #subscribe(Collection)} and
- * {@link #subscribe(Collection, ConsumerRebalanceListener)}
- * <p>
- * Subscribe to the given list of topics to get dynamically assigned partitions.
- * <b>Topic subscriptions are not incremental. This list will replace the current
- * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
- * with manual partition assignment through {@link #assign(Collection)}.
- *
- * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
- *
- * <p>
- * @param topics The list of topics to subscribe to
- * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
- * for the subscribed topics
- * @throws IllegalArgumentException If topics is null or contains null or empty elements
- * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
- * previously (without a subsequent call to {@link #unsubscribe()}), or if not
- * configured at-least one partition assignment strategy
- */
- private void subscribe(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
- acquireAndEnsureOpen();
- try {
- maybeThrowInvalidGroupIdException();
- if (topics == null)
- throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
- if (topics.isEmpty()) {
- // treat subscribing to empty topic list as the same as unsubscribing
- this.unsubscribe();
- } else {
- for (String topic : topics) {
- if (isBlank(topic))
- throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
- }
-
- throwIfNoAssignorsConfigured();
-
- // Clear the buffered data which are not a part of newly assigned topics
- final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
-
- for (TopicPartition tp : subscriptions.assignedPartitions()) {
- if (topics.contains(tp.topic()))
- currentTopicPartitions.add(tp);
- }
-
- fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
-
- log.info("Subscribed to topic(s): {}", join(topics, ", "));
- if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
- metadata.requestUpdateForNewTopics();
- }
- } finally {
- release();
- }
+ delegate.subscribe(topics);
}
/**
@@ -1012,10 +730,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
- if (listener == null)
- throw new IllegalArgumentException("RebalanceListener cannot be null");
-
- subscribe(pattern, Optional.of(listener));
+ delegate.subscribe(pattern, listener);
}
/**
@@ -1036,47 +751,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Pattern pattern) {
- subscribe(pattern, Optional.empty());
- }
-
- /**
- * Internal helper method for {@link #subscribe(Pattern)} and
- * {@link #subscribe(Pattern, ConsumerRebalanceListener)}
- * <p>
- * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
- * The pattern matching will be done periodically against all topics existing at the time of check.
- * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
- * the max metadata age, the consumer will refresh metadata more often and check for matching topics.
- * <p>
- * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
- * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there
- * is a change to the topics matching the provided pattern and when consumer group membership changes.
- * Group rebalances only take place during an active call to {@link #poll(Duration)}.
- *
- * @param pattern Pattern to subscribe to
- * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
- * for the subscribed topics
- * @throws IllegalArgumentException If pattern or listener is null
- * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
- * previously (without a subsequent call to {@link #unsubscribe()}), or if not
- * configured at-least one partition assignment strategy
- */
- private void subscribe(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
- maybeThrowInvalidGroupIdException();
- if (pattern == null || pattern.toString().equals(""))
- throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
- "null" : "empty"));
-
- acquireAndEnsureOpen();
- try {
- throwIfNoAssignorsConfigured();
- log.info("Subscribed to pattern: '{}'", pattern);
- this.subscriptions.subscribe(pattern, listener);
- this.coordinator.updatePatternSubscription(metadata.fetch());
- this.metadata.requestUpdateForNewTopics();
- } finally {
- release();
- }
+ delegate.subscribe(pattern);
}
/**
@@ -1086,18 +761,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors)
*/
public void unsubscribe() {
- acquireAndEnsureOpen();
- try {
- fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
- if (this.coordinator != null) {
- this.coordinator.onLeavePrepare();
- this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
- }
- this.subscriptions.unsubscribe();
- log.info("Unsubscribed all topics or patterns and assigned partitions");
- } finally {
- release();
- }
+ delegate.unsubscribe();
}
/**
@@ -1121,32 +785,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void assign(Collection<TopicPartition> partitions) {
- acquireAndEnsureOpen();
- try {
- if (partitions == null) {
- throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
- } else if (partitions.isEmpty()) {
- this.unsubscribe();
- } else {
- for (TopicPartition tp : partitions) {
- String topic = (tp != null) ? tp.topic() : null;
- if (isBlank(topic))
- throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
- }
- fetcher.clearBufferedDataForUnassignedPartitions(partitions);
-
- // make sure the offsets of topic partitions the consumer is unsubscribing from
- // are committed since there will be no following rebalance
- if (coordinator != null)
- this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
-
- log.info("Assigned to partition(s): {}", join(partitions, ", "));
- if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
- metadata.requestUpdateForNewTopics();
- }
- } finally {
- release();
- }
+ delegate.assign(partitions);
}
/**
@@ -1185,7 +824,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Deprecated
@Override
public ConsumerRecords<K, V> poll(final long timeoutMs) {
- return poll(time.timer(timeoutMs), false);
+ return delegate.poll(timeoutMs);
}
/**
@@ -1232,110 +871,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
- return poll(time.timer(timeout), true);
- }
-
- /**
- * @throws KafkaException if the rebalance callback throws exception
- */
- private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
- acquireAndEnsureOpen();
- try {
- this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
-
- if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
- throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
- }
-
- do {
- client.maybeTriggerWakeup();
-
- if (includeMetadataInTimeout) {
- // try to update assignment metadata BUT do not need to block on the timer for join group
- updateAssignmentMetadataIfNeeded(timer, false);
- } else {
- while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
- log.warn("Still waiting for metadata");
- }
- }
-
- final Fetch<K, V> fetch = pollForFetches(timer);
- if (!fetch.isEmpty()) {
- // before returning the fetched records, we can send off the next round of fetches
- // and avoid block waiting for their responses to enable pipelining while the user
- // is handling the fetched records.
- //
- // NOTE: since the consumed position has already been updated, we must not allow
- // wakeups or any other errors to be triggered prior to returning the fetched records.
- if (sendFetches() > 0 || client.hasPendingRequests()) {
- client.transmitSends();
- }
-
- if (fetch.records().isEmpty()) {
- log.trace("Returning empty records from `poll()` "
- + "since the consumer's position has advanced for at least one topic partition");
- }
-
- return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
- }
- } while (timer.notExpired());
-
- return ConsumerRecords.empty();
- } finally {
- release();
- this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
- }
- }
-
- private int sendFetches() {
- offsetFetcher.validatePositionsOnMetadataChange();
- return fetcher.sendFetches();
- }
-
- boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
- if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
- return false;
- }
-
- return updateFetchPositions(timer);
- }
-
- /**
- * @throws KafkaException if the rebalance callback throws exception
- */
- private Fetch<K, V> pollForFetches(Timer timer) {
- long pollTimeout = coordinator == null ? timer.remainingMs() :
- Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
-
- // if data is available already, return it immediately
- final Fetch<K, V> fetch = fetcher.collectFetch();
- if (!fetch.isEmpty()) {
- return fetch;
- }
-
- // send any new fetches (won't resend pending fetches)
- sendFetches();
-
- // We do not want to be stuck blocking in poll if we are missing some positions
- // since the offset lookup may be backing off after a failure
-
- // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
- // updateAssignmentMetadataIfNeeded before this method.
- if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
- pollTimeout = retryBackoffMs;
- }
-
- log.trace("Polling for fetches with timeout {}", pollTimeout);
-
- Timer pollTimer = time.timer(pollTimeout);
- client.poll(pollTimer, () -> {
- // since a fetch might be completed by the background thread, we need this poll condition
- // to ensure that we do not block unnecessarily in poll()
- return !fetcher.hasAvailableFetches();
- });
- timer.update(pollTimer.currentTimeMs());
-
- return fetcher.collectFetch();
+ return delegate.poll(timeout);
}
/**
@@ -1379,7 +915,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitSync() {
- commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+ delegate.commitSync();
}
/**
@@ -1422,7 +958,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitSync(Duration timeout) {
- commitSync(subscriptions.allConsumed(), timeout);
+ delegate.commitSync(timeout);
}
/**
@@ -1470,7 +1006,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
- commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+ delegate.commitSync(offsets);
}
/**
@@ -1518,19 +1054,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
- acquireAndEnsureOpen();
- long commitStart = time.nanoseconds();
- try {
- maybeThrowInvalidGroupIdException();
- offsets.forEach(this::updateLastSeenEpochIfNewer);
- if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
- throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
- "committing offsets " + offsets);
- }
- } finally {
- kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
- release();
- }
+ delegate.commitSync(offsets, timeout);
}
/**
@@ -1540,7 +1064,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitAsync() {
- commitAsync(null);
+ delegate.commitAsync();
}
/**
@@ -1563,7 +1087,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitAsync(OffsetCommitCallback callback) {
- commitAsync(subscriptions.allConsumed(), callback);
+ delegate.commitAsync(callback);
}
/**
@@ -1590,15 +1114,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
- acquireAndEnsureOpen();
- try {
- maybeThrowInvalidGroupIdException();
- log.debug("Committing offsets: {}", offsets);
- offsets.forEach(this::updateLastSeenEpochIfNewer);
- coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
- } finally {
- release();
- }
+ delegate.commitAsync(offsets, callback);
}
/**
@@ -1632,20 +1148,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void seek(TopicPartition partition, long offset) {
- if (offset < 0)
- throw new IllegalArgumentException("seek offset must not be a negative number");
-
- acquireAndEnsureOpen();
- try {
- log.info("Seeking to offset {} for partition {}", offset, partition);
- SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
- offset,
- Optional.empty(), // This will ensure we skip validation
- this.metadata.currentLeader(partition));
- this.subscriptions.seekUnvalidated(partition, newPosition);
- } finally {
- release();
- }
+ delegate.seek(partition, offset);
}
/**
@@ -1659,29 +1162,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
- long offset = offsetAndMetadata.offset();
- if (offset < 0) {
- throw new IllegalArgumentException("seek offset must not be a negative number");
- }
-
- acquireAndEnsureOpen();
- try {
- if (offsetAndMetadata.leaderEpoch().isPresent()) {
- log.info("Seeking to offset {} for partition {} with epoch {}",
- offset, partition, offsetAndMetadata.leaderEpoch().get());
- } else {
- log.info("Seeking to offset {} for partition {}", offset, partition);
- }
- Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeader(partition);
- SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
- offsetAndMetadata.offset(),
- offsetAndMetadata.leaderEpoch(),
- currentLeaderAndEpoch);
- this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
- this.subscriptions.seekUnvalidated(partition, newPosition);
- } finally {
- release();
- }
+ delegate.seek(partition, offsetAndMetadata);
}
/**
@@ -1694,16 +1175,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
- if (partitions == null)
- throw new IllegalArgumentException("Partitions collection cannot be null");
-
- acquireAndEnsureOpen();
- try {
- Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
- subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
- } finally {
- release();
- }
+ delegate.seekToBeginning(partitions);
}
/**
@@ -1719,16 +1191,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
- if (partitions == null)
- throw new IllegalArgumentException("Partitions collection cannot be null");
-
- acquireAndEnsureOpen();
- try {
- Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
- subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
- } finally {
- release();
- }
+ delegate.seekToEnd(partitions);
}
/**
@@ -1759,7 +1222,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public long position(TopicPartition partition) {
- return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.position(partition);
}
/**
@@ -1789,26 +1252,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public long position(TopicPartition partition, final Duration timeout) {
- acquireAndEnsureOpen();
- try {
- if (!this.subscriptions.isAssigned(partition))
- throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
-
- Timer timer = time.timer(timeout);
- do {
- SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition);
- if (position != null)
- return position.offset;
-
- updateFetchPositions(timer);
- client.poll(timer);
- } while (timer.notExpired());
-
- throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
- "for partition " + partition + " could be determined");
- } finally {
- release();
- }
+ return delegate.position(partition, timeout);
}
/**
@@ -1838,7 +1282,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
- return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.committed(partition);
}
/**
@@ -1867,7 +1311,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
- return committed(Collections.singleton(partition), timeout).get(partition);
+ return delegate.committed(partition, timeout);
}
/**
@@ -1899,7 +1343,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
- return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.committed(partitions);
}
/**
@@ -1927,24 +1371,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
- acquireAndEnsureOpen();
- long start = time.nanoseconds();
- try {
- maybeThrowInvalidGroupIdException();
- final Map<TopicPartition, OffsetAndMetadata> offsets;
- offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
- if (offsets == null) {
- throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
- "committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
- "larger to relax the threshold.");
- } else {
- offsets.forEach(this::updateLastSeenEpochIfNewer);
- return offsets;
- }
- } finally {
- kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
- release();
- }
+ return delegate.committed(partitions, timeout);
}
/**
@@ -1974,7 +1401,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Uuid clientInstanceId(Duration timeout) {
- throw new UnsupportedOperationException();
+ return delegate.clientInstanceId(timeout);
}
/**
@@ -1982,7 +1409,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<MetricName, ? extends Metric> metrics() {
- return Collections.unmodifiableMap(this.metrics.metrics());
+ return delegate.metrics();
}
/**
@@ -2004,7 +1431,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.partitionsFor(topic);
}
/**
@@ -2028,19 +1455,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
- acquireAndEnsureOpen();
- try {
- Cluster cluster = this.metadata.fetch();
- List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
- if (!parts.isEmpty())
- return parts;
-
- Timer timer = time.timer(timeout);
- List<PartitionInfo> topicMetadata = topicMetadataFetcher.getTopicMetadata(topic, metadata.allowAutoTopicCreation(), timer);
- return topicMetadata != null ? topicMetadata : Collections.emptyList();
- } finally {
- release();
- }
+ return delegate.partitionsFor(topic, timeout);
}
/**
@@ -2059,7 +1474,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
- return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.listTopics();
}
/**
@@ -2079,12 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
- acquireAndEnsureOpen();
- try {
- return topicMetadataFetcher.getAllTopicMetadata(time.timer(timeout));
- } finally {
- release();
- }
+ return delegate.listTopics(timeout);
}
/**
@@ -2099,15 +1509,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void pause(Collection<TopicPartition> partitions) {
- acquireAndEnsureOpen();
- try {
- log.debug("Pausing partitions {}", partitions);
- for (TopicPartition partition: partitions) {
- subscriptions.pause(partition);
- }
- } finally {
- release();
- }
+ delegate.pause(partitions);
}
/**
@@ -2119,15 +1521,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void resume(Collection<TopicPartition> partitions) {
- acquireAndEnsureOpen();
- try {
- log.debug("Resuming partitions {}", partitions);
- for (TopicPartition partition: partitions) {
- subscriptions.resume(partition);
- }
- } finally {
- release();
- }
+ delegate.resume(partitions);
}
/**
@@ -2137,12 +1531,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Set<TopicPartition> paused() {
- acquireAndEnsureOpen();
- try {
- return Collections.unmodifiableSet(subscriptions.pausedPartitions());
- } finally {
- release();
- }
+ return delegate.paused();
}
/**
@@ -2168,7 +1557,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
- return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.offsetsForTimes(timestampsToSearch);
}
/**
@@ -2195,19 +1584,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
- acquireAndEnsureOpen();
- try {
- for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
- // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
- // OffsetAndTimestamp is always positive.
- if (entry.getValue() < 0)
- throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
- entry.getValue() + ". The target time cannot be negative.");
- }
- return offsetFetcher.offsetsForTimes(timestampsToSearch, time.timer(timeout));
- } finally {
- release();
- }
+ return delegate.offsetsForTimes(timestampsToSearch, timeout);
}
/**
@@ -2226,7 +1603,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
- return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.beginningOffsets(partitions);
}
/**
@@ -2247,12 +1624,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
- acquireAndEnsureOpen();
- try {
- return offsetFetcher.beginningOffsets(partitions, time.timer(timeout));
- } finally {
- release();
- }
+ return delegate.beginningOffsets(partitions, timeout);
}
/**
@@ -2276,7 +1648,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
- return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ return delegate.endOffsets(partitions);
}
/**
@@ -2302,12 +1674,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
- acquireAndEnsureOpen();
- try {
- return offsetFetcher.endOffsets(partitions, time.timer(timeout));
- } finally {
- release();
- }
+ return delegate.endOffsets(partitions, timeout);
}
/**
@@ -2326,30 +1693,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
- acquireAndEnsureOpen();
- try {
- final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
-
- // if the log end offset is not known and hence cannot return lag and there is
- // no in-flight list offset requested yet,
- // issue a list offset request for that partition so that next time
- // we may get the answer; we do not need to wait for the return value
- // since we would not try to poll the network client synchronously
- if (lag == null) {
- if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
- !subscriptions.partitionEndOffsetRequested(topicPartition)) {
- log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
- subscriptions.requestPartitionEndOffset(topicPartition);
- offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
- }
-
- return OptionalLong.empty();
- }
-
- return OptionalLong.of(lag);
- } finally {
- release();
- }
+ return delegate.currentLag(topicPartition);
}
/**
@@ -2360,13 +1704,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public ConsumerGroupMetadata groupMetadata() {
- acquireAndEnsureOpen();
- try {
- maybeThrowInvalidGroupIdException();
- return coordinator.groupMetadata();
- } finally {
- release();
- }
+ return delegate.groupMetadata();
}
/**
@@ -2393,15 +1731,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void enforceRebalance(final String reason) {
- acquireAndEnsureOpen();
- try {
- if (coordinator == null) {
- throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
- }
- coordinator.requestRejoin(reason == null || reason.isEmpty() ? DEFAULT_REASON : reason);
- } finally {
- release();
- }
+ delegate.enforceRebalance(reason);
}
/**
@@ -2409,7 +1739,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void enforceRebalance() {
- enforceRebalance(null);
+ delegate.enforceRebalance();
}
/**
@@ -2424,7 +1754,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void close() {
- close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+ delegate.close();
}
/**
@@ -2444,19 +1774,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void close(Duration timeout) {
- if (timeout.toMillis() < 0)
- throw new IllegalArgumentException("The timeout cannot be negative.");
- acquire();
- try {
- if (!closed) {
- // need to close before setting the flag since the close function
- // itself may trigger rebalance callback that needs the consumer to be open still
- close(timeout, false);
- }
- } finally {
- closed = true;
- release();
- }
+ delegate.close(timeout);
}
/**
@@ -2466,152 +1784,23 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void wakeup() {
- this.client.wakeup();
- }
-
- private Timer createTimerForRequest(final Duration timeout) {
- // this.time could be null if an exception occurs in constructor prior to setting the this.time field
- final Time localTime = (time == null) ? Time.SYSTEM : time;
- return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
- }
-
- private void close(Duration timeout, boolean swallowException) {
- log.trace("Closing the Kafka consumer");
- AtomicReference<Throwable> firstException = new AtomicReference<>();
-
- final Timer closeTimer = createTimerForRequest(timeout);
- // Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to
- // the server in the process of closing which may not respect the overall timeout defined for closing the
- // consumer.
- if (coordinator != null) {
- // This is a blocking call bound by the time remaining in closeTimer
- swallow(log, Level.ERROR, "Failed to close coordinator with a timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), firstException);
- }
-
- if (fetcher != null) {
- // the timeout for the session close is at-most the requestTimeoutMs
- long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs());
- if (remainingDurationInTimeout > 0) {
- remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout);
- }
-
- closeTimer.reset(remainingDurationInTimeout);
-
- // This is a blocking call bound by the time remaining in closeTimer
- swallow(log, Level.ERROR, "Failed to close fetcher with a timeout(ms)=" + closeTimer.timeoutMs(), () -> fetcher.close(closeTimer), firstException);
- }
-
- closeQuietly(interceptors, "consumer interceptors", firstException);
- closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
- closeQuietly(metrics, "consumer metrics", firstException);
- closeQuietly(client, "consumer network client", firstException);
- closeQuietly(deserializers, "consumer deserializers", firstException);
- AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
- log.debug("Kafka consumer has been closed");
- Throwable exception = firstException.get();
- if (exception != null && !swallowException) {
- if (exception instanceof InterruptException) {
- throw (InterruptException) exception;
- }
- throw new KafkaException("Failed to close kafka consumer", exception);
- }
+ delegate.wakeup();
}
- /**
- * Set the fetch position to the committed position (if there is one)
- * or reset it using the offset reset policy the user has configured.
- *
- * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
- * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
- * defined
- * @return true iff the operation completed without timing out
- */
- private boolean updateFetchPositions(final Timer timer) {
- // If any partitions have been truncated due to a leader change, we need to validate the offsets
- offsetFetcher.validatePositionsIfNeeded();
-
- cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
- if (cachedSubscriptionHasAllFetchPositions) return true;
-
- // If there are any partitions which do not have a valid position and are not
- // awaiting reset, then we need to fetch committed offsets. We will only do a
- // coordinator lookup if there are partitions which have missing positions, so
- // a consumer with manually assigned partitions can avoid a coordinator dependence
- // by always ensuring that assigned partitions have an initial position.
- if (coordinator != null && !coordinator.initWithCommittedOffsetsIfNeeded(timer)) return false;
-
- // If there are partitions still needing a position and a reset policy is defined,
- // request reset using the default policy. If no reset strategy is defined and there
- // are partitions with a missing position, then we will raise an exception.
- subscriptions.resetInitializingPositions();
-
- // Finally send an asynchronous request to look up and update the positions of any
- // partitions which are awaiting reset.
- offsetFetcher.resetPositionsIfNeeded();
-
- return true;
- }
-
- /**
- * Acquire the light lock and ensure that the consumer hasn't been closed.
- * @throws IllegalStateException If the consumer has been closed
- */
- private void acquireAndEnsureOpen() {
- acquire();
- if (this.closed) {
- release();
- throw new IllegalStateException("This consumer has already been closed.");
- }
- }
-
- /**
- * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
- * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
- * supported).
- * @throws ConcurrentModificationException if another thread already has the lock
- */
- private void acquire() {
- final Thread thread = Thread.currentThread();
- final long threadId = thread.getId();
- if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
- throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +
- "currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +
- " otherThread(id: " + currentThread.get() + ")"
- );
- refcount.incrementAndGet();
- }
-
- /**
- * Release the light lock protecting the consumer from multi-threaded access.
- */
- private void release() {
- if (refcount.decrementAndGet() == 0)
- currentThread.set(NO_CURRENT_THREAD);
- }
-
- private void throwIfNoAssignorsConfigured() {
- if (assignors.isEmpty())
- throw new IllegalStateException("Must configure at least one partition assigner class name to " +
- ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
- }
-
- private void maybeThrowInvalidGroupIdException() {
- if (!groupId.isPresent())
- throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
- "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
+ // Functions below are for testing only
+ String clientId() {
+ return delegate.clientId();
}
- private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
- if (offsetAndMetadata != null)
- offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+ Metrics metricsRegistry() {
+ return delegate.metricsRegistry();
}
- // Functions below are for testing only
- String getClientId() {
- return clientId;
+ KafkaConsumerMetrics kafkaConsumerMetrics() {
+ return delegate.kafkaConsumerMetrics();
}
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
- return updateAssignmentMetadataIfNeeded(timer, true);
+ return delegate.updateAssignmentMetadataIfNeeded(timer);
}
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
similarity index 88%
rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index a90d37597a3..9f8d7206578 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -28,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
@@ -53,7 +55,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -79,7 +80,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -91,11 +91,10 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
@@ -105,16 +104,21 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshC
import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.isBlank;
import static org.apache.kafka.common.utils.Utils.join;
-import static org.apache.kafka.common.utils.Utils.propsToMap;
/**
- * This prototype consumer uses an {@link ApplicationEventHandler event handler} to process
- * {@link ApplicationEvent application events} so that the network IO can be processed in a dedicated
+ * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process
+ * {@link ApplicationEvent application events} so that the network I/O can be processed in a dedicated
* {@link ConsumerNetworkThread network thread}. Visit
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design">this document</a>
- * for detail implementation.
+ * for implementation detail.
+ *
+ * <p/>
+ *
+ * <em>Note:</em> this {@link Consumer} implementation is part of the revised consumer group protocol from KIP-848.
+ * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before.
+ * This consumer implements the new consumer group protocol and is intended to be the default in coming releases.
*/
-public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
+public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
@@ -140,7 +144,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long retryBackoffMs;
- private final long defaultApiTimeoutMs;
+ private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private final List<ConsumerPartitionAssignor> assignors;
@@ -148,30 +152,9 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
- public PrototypeAsyncConsumer(final Properties properties,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer) {
- this(propsToMap(properties), keyDeserializer, valueDeserializer);
- }
-
- public PrototypeAsyncConsumer(final Map<String, Object> configs,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer) {
- this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
- keyDeserializer,
- valueDeserializer);
- }
-
- public PrototypeAsyncConsumer(final ConsumerConfig config,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer) {
- this(Time.SYSTEM, config, keyDeserializer, valueDeserializer);
- }
-
- public PrototypeAsyncConsumer(final Time time,
- final ConsumerConfig config,
- final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer) {
+ AsyncKafkaConsumer(final ConsumerConfig config,
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer) {
try {
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
@@ -188,7 +171,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
log.debug("Initializing the Kafka consumer");
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
- this.time = time;
+ this.time = Time.SYSTEM;
this.metrics = createMetrics(config, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -250,7 +233,7 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
// no coordinator will be constructed for the default (null) group id
if (!groupId.isPresent()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
- //config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+ config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
}
// The FetchCollector is only used on the application thread.
@@ -278,22 +261,23 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
}
}
- public PrototypeAsyncConsumer(LogContext logContext,
- String clientId,
- Deserializers<K, V> deserializers,
- FetchBuffer fetchBuffer,
- FetchCollector<K, V> fetchCollector,
- ConsumerInterceptors<K, V> interceptors,
- Time time,
- ApplicationEventHandler applicationEventHandler,
- BlockingQueue<BackgroundEvent> backgroundEventQueue,
- Metrics metrics,
- SubscriptionState subscriptions,
- ConsumerMetadata metadata,
- long retryBackoffMs,
- int defaultApiTimeoutMs,
- List<ConsumerPartitionAssignor> assignors,
- String groupId) {
+ // Visible for testing
+ AsyncKafkaConsumer(LogContext logContext,
+ String clientId,
+ Deserializers<K, V> deserializers,
+ FetchBuffer fetchBuffer,
+ FetchCollector<K, V> fetchCollector,
+ ConsumerInterceptors<K, V> interceptors,
+ Time time,
+ ApplicationEventHandler applicationEventHandler,
+ BlockingQueue<BackgroundEvent> backgroundEventQueue,
+ Metrics metrics,
+ SubscriptionState subscriptions,
+ ConsumerMetadata metadata,
+ long retryBackoffMs,
+ int defaultApiTimeoutMs,
+ List<ConsumerPartitionAssignor> assignors,
+ String groupId) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
@@ -314,6 +298,84 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
}
+ // Visible for testing
+ AsyncKafkaConsumer(LogContext logContext,
+ Time time,
+ ConsumerConfig config,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ KafkaClient client,
+ SubscriptionState subscriptions,
+ ConsumerMetadata metadata,
+ List<ConsumerPartitionAssignor> assignors) {
+ this.log = logContext.logger(getClass());
+ this.subscriptions = subscriptions;
+ this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+ this.fetchBuffer = new FetchBuffer(logContext);
+ this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
+ this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
+ this.time = time;
+ this.metrics = new Metrics(time);
+ this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ this.metadata = metadata;
+ this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+ this.assignors = assignors;
+
+ ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+ FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
+ this.fetchCollector = new FetchCollector<>(logContext,
+ metadata,
+ subscriptions,
+ new FetchConfig(config),
+ deserializers,
+ fetchMetricsManager,
+ time);
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
+
+ GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
+ config,
+ GroupRebalanceConfig.ProtocolType.CONSUMER
+ );
+
+ BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>();
+ BlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventProcessor = new BackgroundEventProcessor(logContext, backgroundEventQueue);
+ ApiVersions apiVersions = new ApiVersions();
+ Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
+ time,
+ config,
+ logContext,
+ client
+ );
+ Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
+ time,
+ logContext,
+ backgroundEventQueue,
+ metadata,
+ subscriptions,
+ fetchBuffer,
+ config,
+ groupRebalanceConfig,
+ apiVersions,
+ fetchMetricsManager,
+ networkClientDelegateSupplier
+ );
+ Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
+ logContext,
+ metadata,
+ applicationEventQueue,
+ requestManagersSupplier
+ );
+ this.applicationEventHandler = new ApplicationEventHandler(logContext,
+ time,
+ applicationEventQueue,
+ applicationEventProcessorSupplier,
+ networkClientDelegateSupplier,
+ requestManagersSupplier);
+ }
+
/**
* poll implementation using {@link ApplicationEventHandler}.
* 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
@@ -970,6 +1032,9 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions();
+ if (initializingPartitions.isEmpty())
+ return true;
+
log.debug("Refreshing committed offsets for partitions {}", initializingPartitions);
try {
final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(initializingPartitions);
@@ -982,23 +1047,6 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
}
}
- // This is here temporary as we don't have public access to the ConsumerConfig in this module.
- public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
- Deserializer<?> keyDeserializer,
- Deserializer<?> valueDeserializer) {
- // validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value
- Map<String, Object> newConfigs = new HashMap<>(configs);
- if (keyDeserializer != null)
- newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
- else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
- throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
- if (valueDeserializer != null)
- newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
- else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
- throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null.");
- return newConfigs;
- }
-
private void throwIfNoAssignorsConfigured() {
if (assignors.isEmpty())
throw new IllegalStateException("Must configure at least one partition assigner class name to " +
@@ -1018,7 +1066,8 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
}
}
- boolean updateAssignmentMetadataIfNeeded(Timer timer) {
+ @Override
+ public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
backgroundEventProcessor.process();
// Keeping this updateAssignmentMetadataIfNeeded wrapping up the updateFetchPositions as
@@ -1096,4 +1145,18 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
}
}
+ @Override
+ public String clientId() {
+ return clientId;
+ }
+
+ @Override
+ public Metrics metricsRegistry() {
+ return metrics;
+ }
+
+ @Override
+ public KafkaConsumerMetrics kafkaConsumerMetrics() {
+ return kafkaConsumerMetrics;
+ }
}
\ No newline at end of file
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index a04d3fa8437..f996f909302 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -56,6 +56,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;
import static org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
@@ -64,8 +65,6 @@ import static org.apache.kafka.common.protocol.Errors.REQUEST_TIMED_OUT;
public class CommitRequestManager implements RequestManager {
- // TODO: current in ConsumerConfig but inaccessible in the internal package.
- private static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
private final SubscriptionState subscriptions;
private final LogContext logContext;
private final Logger log;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java
new file mode 100644
index 00000000000..612827ebe83
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegate.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * This extension interface provides a handful of methods to expose internals of the {@link Consumer} for
+ * various tests.
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should
+ * not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is
+ * the {@link Consumer} API contract that should serve as the caller's interface.
+ */
+public interface ConsumerDelegate<K, V> extends Consumer<K, V> {
+
+ String clientId();
+
+ Metrics metricsRegistry();
+
+ KafkaConsumerMetrics kafkaConsumerMetrics();
+
+ boolean updateAssignmentMetadataIfNeeded(final Timer timer);
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
new file mode 100644
index 00000000000..bd95e06c864
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * {@code ConsumerDelegateCreator} implements a quasi-factory pattern to allow the caller to remain unaware of the
+ * underlying {@link Consumer} implementation that is created. This provides the means by which {@link KafkaConsumer}
+ * can remain the top-level facade for implementations, but allow different implementations to co-exist under
+ * the covers.
+ *
+ * <p/>
+ *
+ * The current logic for the {@code ConsumerCreator} inspects the incoming configuration and determines if
+ * it is using the new consumer group protocol (KIP-848) or if it should fall back to the existing, legacy group
+ * protocol. This is based on the presence and value of the {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG group.protocol}
+ * configuration. If the value is present and equal to "{@code consumer}", the {@link AsyncKafkaConsumer}
+ * will be returned. Otherwise, the {@link LegacyKafkaConsumer} will be returned.
+ *
+ *
+ * <p/>
+ *
+ * <em>Note</em>: this is for internal use only and is not intended for use by end users. Internal users should
+ * not attempt to determine the underlying implementation to avoid coding to an unstable interface. Rather, it is
+ * the {@link Consumer} API contract that should serve as the caller's interface.
+ */
+public class ConsumerDelegateCreator {
+
+ public <K, V> ConsumerDelegate<K, V> create(ConsumerConfig config,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
+ try {
+ GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
+
+ if (groupProtocol == GroupProtocol.CONSUMER)
+ return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
+ else
+ return new LegacyKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
+ } catch (KafkaException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new KafkaException("Failed to construct Kafka consumer", t);
+ }
+ }
+
+ public <K, V> ConsumerDelegate<K, V> create(LogContext logContext,
+ Time time,
+ ConsumerConfig config,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ KafkaClient client,
+ SubscriptionState subscriptions,
+ ConsumerMetadata metadata,
+ List<ConsumerPartitionAssignor> assignors) {
+ try {
+ GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
+
+ if (groupProtocol == GroupProtocol.CONSUMER)
+ return new AsyncKafkaConsumer<>(
+ logContext,
+ time,
+ config,
+ keyDeserializer,
+ valueDeserializer,
+ client,
+ subscriptions,
+ metadata,
+ assignors
+ );
+ else
+ return new LegacyKafkaConsumer<>(
+ logContext,
+ time,
+ config,
+ keyDeserializer,
+ valueDeserializer,
+ client,
+ subscriptions,
+ metadata,
+ assignors
+ );
+ } catch (KafkaException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new KafkaException("Failed to construct Kafka consumer", t);
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 77a2952d1b2..6519e3ef48a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -247,12 +247,10 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
closeTimeout = timeout;
wakeup();
- if (timeoutMs > 0) {
- try {
- join(timeoutMs);
- } catch (InterruptedException e) {
- log.error("Interrupted while waiting for consumer network thread to complete", e);
- }
+ try {
+ join();
+ } catch (InterruptedException e) {
+ log.error("Interrupted while waiting for consumer network thread to complete", e);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index 92b098213b0..e267293f98b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -56,6 +56,12 @@ import java.util.concurrent.TimeUnit;
public final class ConsumerUtils {
+ /**
+ * This configuration has only package-level visibility in {@link ConsumerConfig}, so it's inaccessible in the
+ * internals package where most of its uses live. Attempts were made to move things around, but it was deemed
+ * better to leave it as is.
+ */
+ static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported";
public static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
public static final String CONSUMER_JMX_PREFIX = "kafka.consumer";
public static final String CONSUMER_METRIC_GROUP_PREFIX = "consumer";
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
new file mode 100644
index 00000000000..06495c6a8fc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java
@@ -0,0 +1,1257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_RACK_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createConsumerNetworkClient;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.apache.kafka.common.utils.Utils.isBlank;
+import static org.apache.kafka.common.utils.Utils.join;
+import static org.apache.kafka.common.utils.Utils.swallow;
+
+/**
+ * A client that consumes records from a Kafka cluster using the {@link GroupProtocol#GENERIC generic group protocol}.
+ * In this implementation, all network I/O happens in the thread of the application making the call.
+ *
+ * <p/>
+ *
+ * <em>Note:</em> per its name, this implementation is left for backward compatibility purposes. The updated consumer
+ * group protocol (from KIP-848) introduces allows users continue using the legacy "generic" group protocol.
+ * This class should not be invoked directly; users should instead create a {@link KafkaConsumer} as before.
+ */
+public class LegacyKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
+
+ private static final long NO_CURRENT_THREAD = -1L;
+ public static final String DEFAULT_REASON = "rebalance enforced by user";
+
+ private final Metrics metrics;
+ private final KafkaConsumerMetrics kafkaConsumerMetrics;
+ private Logger log;
+ private final String clientId;
+ private final Optional<String> groupId;
+ private final ConsumerCoordinator coordinator;
+ private final Deserializers<K, V> deserializers;
+ private final Fetcher<K, V> fetcher;
+ private final OffsetFetcher offsetFetcher;
+ private final TopicMetadataFetcher topicMetadataFetcher;
+ private final ConsumerInterceptors<K, V> interceptors;
+ private final IsolationLevel isolationLevel;
+
+ private final Time time;
+ private final ConsumerNetworkClient client;
+ private final SubscriptionState subscriptions;
+ private final ConsumerMetadata metadata;
+ private final long retryBackoffMs;
+ private final long retryBackoffMaxMs;
+ private final int requestTimeoutMs;
+ private final int defaultApiTimeoutMs;
+ private volatile boolean closed = false;
+ private final List<ConsumerPartitionAssignor> assignors;
+
+ // currentThread holds the threadId of the current thread accessing LegacyKafkaConsumer
+ // and is used to prevent multi-threaded access
+ private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
+ // refcount is used to allow reentrant access by the thread who has acquired currentThread
+ private final AtomicInteger refcount = new AtomicInteger(0);
+
+ // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
+ private boolean cachedSubscriptionHasAllFetchPositions;
+
+ LegacyKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+ try {
+ GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
+ GroupRebalanceConfig.ProtocolType.CONSUMER);
+
+ this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
+ this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+ LogContext logContext = createLogContext(config, groupRebalanceConfig);
+ this.log = logContext.logger(getClass());
+ boolean enableAutoCommit = config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
+ groupId.ifPresent(groupIdStr -> {
+ if (groupIdStr.isEmpty()) {
+ log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
+ }
+ });
+
+ log.debug("Initializing the Kafka consumer");
+ this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ this.time = Time.SYSTEM;
+ this.metrics = createMetrics(config, time);
+ this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+
+ List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
+ this.interceptors = new ConsumerInterceptors<>(interceptorList);
+ this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
+ this.subscriptions = createSubscriptionState(config, logContext);
+ ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
+ metrics.reporters(),
+ interceptorList,
+ Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
+ this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
+ this.metadata.bootstrap(addresses);
+
+ FetchMetricsManager fetchMetricsManager = createFetchMetricsManager(metrics);
+ FetchConfig fetchConfig = new FetchConfig(config);
+ this.isolationLevel = fetchConfig.isolationLevel;
+
+ ApiVersions apiVersions = new ApiVersions();
+ this.client = createConsumerNetworkClient(config,
+ metrics,
+ logContext,
+ apiVersions,
+ time,
+ metadata,
+ fetchMetricsManager.throttleTimeSensor(),
+ retryBackoffMs);
+
+ this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
+ config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+ config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
+ );
+
+ // no coordinator will be constructed for the default (null) group id
+ if (!groupId.isPresent()) {
+ config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+ config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+ this.coordinator = null;
+ } else {
+ this.coordinator = new ConsumerCoordinator(groupRebalanceConfig,
+ logContext,
+ this.client,
+ assignors,
+ this.metadata,
+ this.subscriptions,
+ metrics,
+ CONSUMER_METRIC_GROUP_PREFIX,
+ this.time,
+ enableAutoCommit,
+ config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+ this.interceptors,
+ config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
+ config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
+ }
+ this.fetcher = new Fetcher<>(
+ logContext,
+ this.client,
+ this.metadata,
+ this.subscriptions,
+ fetchConfig,
+ this.deserializers,
+ fetchMetricsManager,
+ this.time,
+ apiVersions);
+ this.offsetFetcher = new OffsetFetcher(logContext,
+ client,
+ metadata,
+ subscriptions,
+ time,
+ retryBackoffMs,
+ requestTimeoutMs,
+ isolationLevel,
+ apiVersions);
+ this.topicMetadataFetcher = new TopicMetadataFetcher(logContext,
+ client,
+ retryBackoffMs,
+ retryBackoffMaxMs);
+
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
+
+ config.logUnused();
+ AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics, time.milliseconds());
+ log.debug("Kafka consumer initialized");
+ } catch (Throwable t) {
+ // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
+ // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
+ if (this.log != null) {
+ close(Duration.ZERO, true);
+ }
+ // now propagate the exception
+ throw new KafkaException("Failed to construct kafka consumer", t);
+ }
+ }
+
+ // visible for testing
+ LegacyKafkaConsumer(LogContext logContext,
+ Time time,
+ ConsumerConfig config,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ KafkaClient client,
+ SubscriptionState subscriptions,
+ ConsumerMetadata metadata,
+ List<ConsumerPartitionAssignor> assignors) {
+ this.log = logContext.logger(getClass());
+ this.time = time;
+ this.subscriptions = subscriptions;
+ this.metadata = metadata;
+ this.metrics = new Metrics(time);
+ this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+ this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
+ this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
+ this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ this.assignors = assignors;
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
+ this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
+ this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+ this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
+ int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
+ int rebalanceTimeoutMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
+ int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+ boolean enableAutoCommit = config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
+ boolean throwOnStableOffsetNotSupported = config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+ int autoCommitIntervalMs = config.getInt(AUTO_COMMIT_INTERVAL_MS_CONFIG);
+ String rackId = config.getString(CLIENT_RACK_CONFIG);
+ Optional<String> groupInstanceId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
+
+ this.client = new ConsumerNetworkClient(
+ logContext,
+ client,
+ metadata,
+ time,
+ retryBackoffMs,
+ requestTimeoutMs,
+ heartbeatIntervalMs
+ );
+
+ if (groupId.isPresent()) {
+ GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(
+ sessionTimeoutMs,
+ rebalanceTimeoutMs,
+ heartbeatIntervalMs,
+ groupId.get(),
+ groupInstanceId,
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ true
+ );
+ this.coordinator = new ConsumerCoordinator(
+ rebalanceConfig,
+ logContext,
+ this.client,
+ assignors,
+ metadata,
+ subscriptions,
+ metrics,
+ CONSUMER_METRIC_GROUP_PREFIX,
+ time,
+ enableAutoCommit,
+ autoCommitIntervalMs,
+ interceptors,
+ throwOnStableOffsetNotSupported,
+ rackId
+ );
+ } else {
+ this.coordinator = null;
+ }
+
+ int maxBytes = config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG);
+ int maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+ int minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
+ int fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+ int maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+ boolean checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
+
+ ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+ FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
+ ApiVersions apiVersions = new ApiVersions();
+ FetchConfig fetchConfig = new FetchConfig(
+ minBytes,
+ maxBytes,
+ maxWaitMs,
+ fetchSize,
+ maxPollRecords,
+ checkCrcs,
+ rackId,
+ isolationLevel
+ );
+ this.fetcher = new Fetcher<>(
+ logContext,
+ this.client,
+ metadata,
+ subscriptions,
+ fetchConfig,
+ deserializers,
+ metricsManager,
+ time,
+ apiVersions
+ );
+ this.offsetFetcher = new OffsetFetcher(
+ logContext,
+ this.client,
+ metadata,
+ subscriptions,
+ time,
+ retryBackoffMs,
+ requestTimeoutMs,
+ isolationLevel,
+ apiVersions
+ );
+ this.topicMetadataFetcher = new TopicMetadataFetcher(
+ logContext,
+ this.client,
+ retryBackoffMs,
+ retryBackoffMaxMs
+ );
+ }
+
+ public Set<TopicPartition> assignment() {
+ acquireAndEnsureOpen();
+ try {
+ return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+ } finally {
+ release();
+ }
+ }
+
+ public Set<String> subscription() {
+ acquireAndEnsureOpen();
+ try {
+ return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
+ if (listener == null)
+ throw new IllegalArgumentException("RebalanceListener cannot be null");
+
+ subscribeInternal(topics, Optional.of(listener));
+ }
+
+ @Override
+ public void subscribe(Collection<String> topics) {
+ subscribeInternal(topics, Optional.empty());
+ }
+
+ /**
+ * Internal helper method for {@link #subscribe(Collection)} and
+ * {@link #subscribe(Collection, ConsumerRebalanceListener)}
+ * <p>
+ * Subscribe to the given list of topics to get dynamically assigned partitions.
+ * <b>Topic subscriptions are not incremental. This list will replace the current
+ * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
+ * with manual partition assignment through {@link #assign(Collection)}.
+ *
+ * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
+ *
+ * <p>
+ * @param topics The list of topics to subscribe to
+ * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
+ * for the subscribed topics
+ * @throws IllegalArgumentException If topics is null or contains null or empty elements
+ * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
+ * previously (without a subsequent call to {@link #unsubscribe()}), or if not
+ * configured at-least one partition assignment strategy
+ */
+ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
+ acquireAndEnsureOpen();
+ try {
+ maybeThrowInvalidGroupIdException();
+ if (topics == null)
+ throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
+ if (topics.isEmpty()) {
+ // treat subscribing to empty topic list as the same as unsubscribing
+ this.unsubscribe();
+ } else {
+ for (String topic : topics) {
+ if (isBlank(topic))
+ throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
+ }
+
+ throwIfNoAssignorsConfigured();
+
+ // Clear the buffered data which are not a part of newly assigned topics
+ final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+
+ for (TopicPartition tp : subscriptions.assignedPartitions()) {
+ if (topics.contains(tp.topic()))
+ currentTopicPartitions.add(tp);
+ }
+
+ fetcher.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+
+ log.info("Subscribed to topic(s): {}", join(topics, ", "));
+ if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
+ metadata.requestUpdateForNewTopics();
+ }
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+ if (listener == null)
+ throw new IllegalArgumentException("RebalanceListener cannot be null");
+
+ subscribeInternal(pattern, Optional.of(listener));
+ }
+
+ @Override
+ public void subscribe(Pattern pattern) {
+ subscribeInternal(pattern, Optional.empty());
+ }
+
+ /**
+ * Internal helper method for {@link #subscribe(Pattern)} and
+ * {@link #subscribe(Pattern, ConsumerRebalanceListener)}
+ * <p>
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against all topics existing at the time of check.
+ * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
+ * the max metadata age, the consumer will refresh metadata more often and check for matching topics.
+ * <p>
+ * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
+ * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there
+ * is a change to the topics matching the provided pattern and when consumer group membership changes.
+ * Group rebalances only take place during an active call to {@link #poll(Duration)}.
+ *
+ * @param pattern Pattern to subscribe to
+ * @param listener {@link Optional} listener instance to get notifications on partition assignment/revocation
+ * for the subscribed topics
+ * @throws IllegalArgumentException If pattern or listener is null
+ * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
+ * previously (without a subsequent call to {@link #unsubscribe()}), or if not
+ * configured at-least one partition assignment strategy
+ */
+ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
+ maybeThrowInvalidGroupIdException();
+ if (pattern == null || pattern.toString().equals(""))
+ throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
+ "null" : "empty"));
+
+ acquireAndEnsureOpen();
+ try {
+ throwIfNoAssignorsConfigured();
+ log.info("Subscribed to pattern: '{}'", pattern);
+ this.subscriptions.subscribe(pattern, listener);
+ this.coordinator.updatePatternSubscription(metadata.fetch());
+ this.metadata.requestUpdateForNewTopics();
+ } finally {
+ release();
+ }
+ }
+
+ public void unsubscribe() {
+ acquireAndEnsureOpen();
+ try {
+ fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
+ if (this.coordinator != null) {
+ this.coordinator.onLeavePrepare();
+ this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
+ }
+ this.subscriptions.unsubscribe();
+ log.info("Unsubscribed all topics or patterns and assigned partitions");
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void assign(Collection<TopicPartition> partitions) {
+ acquireAndEnsureOpen();
+ try {
+ if (partitions == null) {
+ throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
+ } else if (partitions.isEmpty()) {
+ this.unsubscribe();
+ } else {
+ for (TopicPartition tp : partitions) {
+ String topic = (tp != null) ? tp.topic() : null;
+ if (isBlank(topic))
+ throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
+ }
+ fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+ // make sure the offsets of topic partitions the consumer is unsubscribing from
+ // are committed since there will be no following rebalance
+ if (coordinator != null)
+ this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+
+ log.info("Assigned to partition(s): {}", join(partitions, ", "));
+ if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
+ metadata.requestUpdateForNewTopics();
+ }
+ } finally {
+ release();
+ }
+ }
+
+ @Deprecated
+ @Override
+ public ConsumerRecords<K, V> poll(final long timeoutMs) {
+ return poll(time.timer(timeoutMs), false);
+ }
+
+ @Override
+ public ConsumerRecords<K, V> poll(final Duration timeout) {
+ return poll(time.timer(timeout), true);
+ }
+
+ /**
+ * @throws KafkaException if the rebalance callback throws exception
+ */
+ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
+ acquireAndEnsureOpen();
+ try {
+ this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
+
+ if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
+ throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
+ }
+
+ do {
+ client.maybeTriggerWakeup();
+
+ if (includeMetadataInTimeout) {
+ // try to update assignment metadata BUT do not need to block on the timer for join group
+ updateAssignmentMetadataIfNeeded(timer, false);
+ } else {
+ while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
+ log.warn("Still waiting for metadata");
+ }
+ }
+
+ final Fetch<K, V> fetch = pollForFetches(timer);
+ if (!fetch.isEmpty()) {
+ // before returning the fetched records, we can send off the next round of fetches
+ // and avoid block waiting for their responses to enable pipelining while the user
+ // is handling the fetched records.
+ //
+ // NOTE: since the consumed position has already been updated, we must not allow
+ // wakeups or any other errors to be triggered prior to returning the fetched records.
+ if (sendFetches() > 0 || client.hasPendingRequests()) {
+ client.transmitSends();
+ }
+
+ if (fetch.records().isEmpty()) {
+ log.trace("Returning empty records from `poll()` "
+ + "since the consumer's position has advanced for at least one topic partition");
+ }
+
+ return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
+ }
+ } while (timer.notExpired());
+
+ return ConsumerRecords.empty();
+ } finally {
+ release();
+ this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+ }
+ }
+
+ private int sendFetches() {
+ offsetFetcher.validatePositionsOnMetadataChange();
+ return fetcher.sendFetches();
+ }
+
+ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
+ if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
+ return false;
+ }
+
+ return updateFetchPositions(timer);
+ }
+
+ /**
+ * @throws KafkaException if the rebalance callback throws exception
+ */
+ private Fetch<K, V> pollForFetches(Timer timer) {
+ long pollTimeout = coordinator == null ? timer.remainingMs() :
+ Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
+
+ // if data is available already, return it immediately
+ final Fetch<K, V> fetch = fetcher.collectFetch();
+ if (!fetch.isEmpty()) {
+ return fetch;
+ }
+
+ // send any new fetches (won't resend pending fetches)
+ sendFetches();
+
+ // We do not want to be stuck blocking in poll if we are missing some positions
+ // since the offset lookup may be backing off after a failure
+
+ // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
+ // updateAssignmentMetadataIfNeeded before this method.
+ if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
+ pollTimeout = retryBackoffMs;
+ }
+
+ log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+ Timer pollTimer = time.timer(pollTimeout);
+ client.poll(pollTimer, () -> {
+ // since a fetch might be completed by the background thread, we need this poll condition
+ // to ensure that we do not block unnecessarily in poll()
+ return !fetcher.hasAvailableFetches();
+ });
+ timer.update(pollTimer.currentTimeMs());
+
+ return fetcher.collectFetch();
+ }
+
+ @Override
+ public void commitSync() {
+ commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public void commitSync(Duration timeout) {
+ commitSync(subscriptions.allConsumed(), timeout);
+ }
+
+ @Override
+ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+ commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
+ acquireAndEnsureOpen();
+ long commitStart = time.nanoseconds();
+ try {
+ maybeThrowInvalidGroupIdException();
+ offsets.forEach(this::updateLastSeenEpochIfNewer);
+ if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
+ throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
+ "committing offsets " + offsets);
+ }
+ } finally {
+ kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart);
+ release();
+ }
+ }
+
+ @Override
+ public void commitAsync() {
+ commitAsync(null);
+ }
+
+ @Override
+ public void commitAsync(OffsetCommitCallback callback) {
+ commitAsync(subscriptions.allConsumed(), callback);
+ }
+
+ @Override
+ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+ acquireAndEnsureOpen();
+ try {
+ maybeThrowInvalidGroupIdException();
+ log.debug("Committing offsets: {}", offsets);
+ offsets.forEach(this::updateLastSeenEpochIfNewer);
+ coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void seek(TopicPartition partition, long offset) {
+ if (offset < 0)
+ throw new IllegalArgumentException("seek offset must not be a negative number");
+
+ acquireAndEnsureOpen();
+ try {
+ log.info("Seeking to offset {} for partition {}", offset, partition);
+ SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
+ offset,
+ Optional.empty(), // This will ensure we skip validation
+ this.metadata.currentLeader(partition));
+ this.subscriptions.seekUnvalidated(partition, newPosition);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
+ long offset = offsetAndMetadata.offset();
+ if (offset < 0) {
+ throw new IllegalArgumentException("seek offset must not be a negative number");
+ }
+
+ acquireAndEnsureOpen();
+ try {
+ if (offsetAndMetadata.leaderEpoch().isPresent()) {
+ log.info("Seeking to offset {} for partition {} with epoch {}",
+ offset, partition, offsetAndMetadata.leaderEpoch().get());
+ } else {
+ log.info("Seeking to offset {} for partition {}", offset, partition);
+ }
+ Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeader(partition);
+ SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
+ offsetAndMetadata.offset(),
+ offsetAndMetadata.leaderEpoch(),
+ currentLeaderAndEpoch);
+ this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
+ this.subscriptions.seekUnvalidated(partition, newPosition);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void seekToBeginning(Collection<TopicPartition> partitions) {
+ if (partitions == null)
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+
+ acquireAndEnsureOpen();
+ try {
+ Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
+ subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void seekToEnd(Collection<TopicPartition> partitions) {
+ if (partitions == null)
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+
+ acquireAndEnsureOpen();
+ try {
+ Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
+ subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public long position(TopicPartition partition) {
+ return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public long position(TopicPartition partition, final Duration timeout) {
+ acquireAndEnsureOpen();
+ try {
+ if (!this.subscriptions.isAssigned(partition))
+ throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
+
+ Timer timer = time.timer(timeout);
+ do {
+ SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition);
+ if (position != null)
+ return position.offset;
+
+ updateFetchPositions(timer);
+ client.poll(timer);
+ } while (timer.notExpired());
+
+ throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
+ "for partition " + partition + " could be determined");
+ } finally {
+ release();
+ }
+ }
+
+ @Deprecated
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition) {
+ return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Deprecated
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
+ return committed(Collections.singleton(partition), timeout).get(partition);
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
+ return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
+ acquireAndEnsureOpen();
+ long start = time.nanoseconds();
+ try {
+ maybeThrowInvalidGroupIdException();
+ final Map<TopicPartition, OffsetAndMetadata> offsets;
+ offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+ if (offsets == null) {
+ throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
+ "committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
+ "larger to relax the threshold.");
+ } else {
+ offsets.forEach(this::updateLastSeenEpochIfNewer);
+ return offsets;
+ }
+ } finally {
+ kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start);
+ release();
+ }
+ }
+
+ @Override
+ public Uuid clientInstanceId(Duration timeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<MetricName, ? extends Metric> metrics() {
+ return Collections.unmodifiableMap(this.metrics.metrics());
+ }
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
+ acquireAndEnsureOpen();
+ try {
+ Cluster cluster = this.metadata.fetch();
+ List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
+ if (!parts.isEmpty())
+ return parts;
+
+ Timer timer = time.timer(timeout);
+ List<PartitionInfo> topicMetadata = topicMetadataFetcher.getTopicMetadata(topic, metadata.allowAutoTopicCreation(), timer);
+ return topicMetadata != null ? topicMetadata : Collections.emptyList();
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics() {
+ return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
+ acquireAndEnsureOpen();
+ try {
+ return topicMetadataFetcher.getAllTopicMetadata(time.timer(timeout));
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void pause(Collection<TopicPartition> partitions) {
+ acquireAndEnsureOpen();
+ try {
+ log.debug("Pausing partitions {}", partitions);
+ for (TopicPartition partition: partitions) {
+ subscriptions.pause(partition);
+ }
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void resume(Collection<TopicPartition> partitions) {
+ acquireAndEnsureOpen();
+ try {
+ log.debug("Resuming partitions {}", partitions);
+ for (TopicPartition partition: partitions) {
+ subscriptions.resume(partition);
+ }
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public Set<TopicPartition> paused() {
+ acquireAndEnsureOpen();
+ try {
+ return Collections.unmodifiableSet(subscriptions.pausedPartitions());
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+ return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
+ acquireAndEnsureOpen();
+ try {
+ for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
+ // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
+ // OffsetAndTimestamp is always positive.
+ if (entry.getValue() < 0)
+ throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
+ entry.getValue() + ". The target time cannot be negative.");
+ }
+ return offsetFetcher.offsetsForTimes(timestampsToSearch, time.timer(timeout));
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+ return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+ acquireAndEnsureOpen();
+ try {
+ return offsetFetcher.beginningOffsets(partitions, time.timer(timeout));
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+ return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ }
+
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+ acquireAndEnsureOpen();
+ try {
+ return offsetFetcher.endOffsets(partitions, time.timer(timeout));
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public OptionalLong currentLag(TopicPartition topicPartition) {
+ acquireAndEnsureOpen();
+ try {
+ final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
+
+ // if the log end offset is not known and hence cannot return lag and there is
+ // no in-flight list offset requested yet,
+ // issue a list offset request for that partition so that next time
+ // we may get the answer; we do not need to wait for the return value
+ // since we would not try to poll the network client synchronously
+ if (lag == null) {
+ if (subscriptions.partitionEndOffset(topicPartition, isolationLevel) == null &&
+ !subscriptions.partitionEndOffsetRequested(topicPartition)) {
+ log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
+ subscriptions.requestPartitionEndOffset(topicPartition);
+ offsetFetcher.endOffsets(Collections.singleton(topicPartition), time.timer(0L));
+ }
+
+ return OptionalLong.empty();
+ }
+
+ return OptionalLong.of(lag);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public ConsumerGroupMetadata groupMetadata() {
+ acquireAndEnsureOpen();
+ try {
+ maybeThrowInvalidGroupIdException();
+ return coordinator.groupMetadata();
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void enforceRebalance(final String reason) {
+ acquireAndEnsureOpen();
+ try {
+ if (coordinator == null) {
+ throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
+ }
+ coordinator.requestRejoin(reason == null || reason.isEmpty() ? DEFAULT_REASON : reason);
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public void enforceRebalance() {
+ enforceRebalance(null);
+ }
+
+ @Override
+ public void close() {
+ close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+ }
+
+ @Override
+ public void close(Duration timeout) {
+ if (timeout.toMillis() < 0)
+ throw new IllegalArgumentException("The timeout cannot be negative.");
+ acquire();
+ try {
+ if (!closed) {
+ // need to close before setting the flag since the close function
+ // itself may trigger rebalance callback that needs the consumer to be open still
+ close(timeout, false);
+ }
+ } finally {
+ closed = true;
+ release();
+ }
+ }
+
+ @Override
+ public void wakeup() {
+ this.client.wakeup();
+ }
+
+ private Timer createTimerForRequest(final Duration timeout) {
+ // this.time could be null if an exception occurs in constructor prior to setting the this.time field
+ final Time localTime = (time == null) ? Time.SYSTEM : time;
+ return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+ }
+
+ private void close(Duration timeout, boolean swallowException) {
+ log.trace("Closing the Kafka consumer");
+ AtomicReference<Throwable> firstException = new AtomicReference<>();
+
+ final Timer closeTimer = createTimerForRequest(timeout);
+ // Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to
+ // the server in the process of closing which may not respect the overall timeout defined for closing the
+ // consumer.
+ if (coordinator != null) {
+ // This is a blocking call bound by the time remaining in closeTimer
+ swallow(log, Level.ERROR, "Failed to close coordinator with a timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), firstException);
+ }
+
+ if (fetcher != null) {
+ // the timeout for the session close is at-most the requestTimeoutMs
+ long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs());
+ if (remainingDurationInTimeout > 0) {
+ remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout);
+ }
+
+ closeTimer.reset(remainingDurationInTimeout);
+
+ // This is a blocking call bound by the time remaining in closeTimer
+ swallow(log, Level.ERROR, "Failed to close fetcher with a timeout(ms)=" + closeTimer.timeoutMs(), () -> fetcher.close(closeTimer), firstException);
+ }
+
+ closeQuietly(interceptors, "consumer interceptors", firstException);
+ closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
+ closeQuietly(metrics, "consumer metrics", firstException);
+ closeQuietly(client, "consumer network client", firstException);
+ closeQuietly(deserializers, "consumer deserializers", firstException);
+ AppInfoParser.unregisterAppInfo(CONSUMER_JMX_PREFIX, clientId, metrics);
+ log.debug("Kafka consumer has been closed");
+ Throwable exception = firstException.get();
+ if (exception != null && !swallowException) {
+ if (exception instanceof InterruptException) {
+ throw (InterruptException) exception;
+ }
+ throw new KafkaException("Failed to close kafka consumer", exception);
+ }
+ }
+
+ /**
+ * Set the fetch position to the committed position (if there is one)
+ * or reset it using the offset reset policy the user has configured.
+ *
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+ * defined
+ * @return true iff the operation completed without timing out
+ */
+ private boolean updateFetchPositions(final Timer timer) {
+ // If any partitions have been truncated due to a leader change, we need to validate the offsets
+ offsetFetcher.validatePositionsIfNeeded();
+
+ cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
+ if (cachedSubscriptionHasAllFetchPositions) return true;
+
+ // If there are any partitions which do not have a valid position and are not
+ // awaiting reset, then we need to fetch committed offsets. We will only do a
+ // coordinator lookup if there are partitions which have missing positions, so
+ // a consumer with manually assigned partitions can avoid a coordinator dependence
+ // by always ensuring that assigned partitions have an initial position.
+ if (coordinator != null && !coordinator.initWithCommittedOffsetsIfNeeded(timer)) return false;
+
+ // If there are partitions still needing a position and a reset policy is defined,
+ // request reset using the default policy. If no reset strategy is defined and there
+ // are partitions with a missing position, then we will raise an exception.
+ subscriptions.resetInitializingPositions();
+
+ // Finally send an asynchronous request to look up and update the positions of any
+ // partitions which are awaiting reset.
+ offsetFetcher.resetPositionsIfNeeded();
+
+ return true;
+ }
+
+ /**
+ * Acquire the light lock and ensure that the consumer hasn't been closed.
+ * @throws IllegalStateException If the consumer has been closed
+ */
+ private void acquireAndEnsureOpen() {
+ acquire();
+ if (this.closed) {
+ release();
+ throw new IllegalStateException("This consumer has already been closed.");
+ }
+ }
+
+ /**
+ * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
+ * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
+ * supported).
+ * @throws ConcurrentModificationException if another thread already has the lock
+ */
+ private void acquire() {
+ final Thread thread = Thread.currentThread();
+ final long threadId = thread.getId();
+ if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
+ throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " +
+ "currentThread(name: " + thread.getName() + ", id: " + threadId + ")" +
+ " otherThread(id: " + currentThread.get() + ")"
+ );
+ refcount.incrementAndGet();
+ }
+
+ /**
+ * Release the light lock protecting the consumer from multi-threaded access.
+ */
+ private void release() {
+ if (refcount.decrementAndGet() == 0)
+ currentThread.set(NO_CURRENT_THREAD);
+ }
+
+ private void throwIfNoAssignorsConfigured() {
+ if (assignors.isEmpty())
+ throw new IllegalStateException("Must configure at least one partition assigner class name to " +
+ ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
+ }
+
+ private void maybeThrowInvalidGroupIdException() {
+ if (!groupId.isPresent())
+ throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
+ "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
+ }
+
+ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
+ if (offsetAndMetadata != null)
+ offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+ }
+
+ // Functions below are for testing only
+ @Override
+ public String clientId() {
+ return clientId;
+ }
+
+ @Override
+ public Metrics metricsRegistry() {
+ return metrics;
+ }
+
+ @Override
+ public KafkaConsumerMetrics kafkaConsumerMetrics() {
+ return kafkaConsumerMetrics;
+ }
+
+ @Override
+ public boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
+ return updateAssignmentMetadataIfNeeded(timer, true);
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 58181b68535..df3ba98dd0c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -39,7 +39,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class ConsumerConfigTest {
@@ -66,18 +65,24 @@ public class ConsumerConfigTest {
@Test
public void testOverrideEnableAutoCommit() {
- ConsumerConfig config = new ConsumerConfig(properties);
- boolean overrideEnableAutoCommit = config.maybeOverrideEnableAutoCommit();
- assertFalse(overrideEnableAutoCommit);
-
- properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- config = new ConsumerConfig(properties);
- try {
- config.maybeOverrideEnableAutoCommit();
- fail("Should have thrown an exception");
- } catch (InvalidConfigurationException e) {
- // expected
- }
+ // Verify that our default properties (no 'enable.auto.commit' or 'group.id') are valid.
+ assertEquals(false, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+ // Verify that explicitly disabling 'enable.auto.commit' still works.
+ properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString());
+ assertEquals(false, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+ // Verify that enabling 'enable.auto.commit' but without 'group.id' fails.
+ properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
+ assertThrows(InvalidConfigurationException.class, () -> new ConsumerConfig(properties));
+
+ // Verify that then adding 'group.id' to the mix allows it to pass OK.
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+ assertEquals(true, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+ // Now remove the 'enable.auto.commit' flag and verify that it is set to true (the default).
+ properties.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ assertEquals(true, new ConsumerConfig(properties).getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index b4c3dba56fb..b07ce529c36 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,27 +16,15 @@
*/
package org.apache.kafka.clients.consumer;
-import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
-import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
-import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
-import org.apache.kafka.clients.consumer.internals.Deserializers;
-import org.apache.kafka.clients.consumer.internals.FetchConfig;
-import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
-import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
-import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.consumer.internals.TopicMetadataFetcher;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
@@ -106,7 +94,8 @@ import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -124,6 +113,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
@@ -146,8 +136,9 @@ import java.util.stream.Stream;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
-import static org.apache.kafka.clients.consumer.KafkaConsumer.DEFAULT_REASON;
+import static org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.DEFAULT_REASON;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
+import static org.apache.kafka.common.utils.Utils.propsToMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -181,13 +172,11 @@ public class KafkaConsumerTest {
private final int sessionTimeoutMs = 10000;
private final int defaultApiTimeoutMs = 60000;
- private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
private final int heartbeatIntervalMs = 1000;
// Set auto commit interval lower than heartbeat so we don't need to deal with
// a concurrent heartbeat request
private final int autoCommitIntervalMs = 500;
- private final int throttleMs = 10;
private final String groupId = "mock-group";
private final String memberId = "memberId";
@@ -222,42 +211,51 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void testMetricsReporterAutoGeneratedClientId() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol) {
Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
- consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
- MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0);
+ MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().get(0);
- assertEquals(consumer.getClientId(), mockMetricsReporter.clientId);
- assertEquals(2, consumer.metrics.reporters().size());
+ assertEquals(consumer.clientId(), mockMetricsReporter.clientId);
+ assertEquals(2, consumer.metricsRegistry().reporters().size());
}
- @Test
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
@SuppressWarnings("deprecation")
- public void testDisableJmxReporter() {
+ public void testDisableJmxReporter(GroupProtocol groupProtocol) {
Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
- consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
- assertTrue(consumer.metrics.reporters().isEmpty());
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ assertTrue(consumer.metricsRegistry().reporters().isEmpty());
}
- @Test
- public void testExplicitlyEnableJmxReporter() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testExplicitlyEnableJmxReporter(GroupProtocol groupProtocol) {
Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
- consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
- assertEquals(1, consumer.metrics.reporters().size());
+ consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
+ assertEquals(1, consumer.metricsRegistry().reporters().size());
}
- @Test
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
@SuppressWarnings("unchecked")
- public void testPollReturnsRecords() {
- consumer = setUpConsumerWithRecordsToPoll(tp0, 5);
+ public void testPollReturnsRecords(GroupProtocol groupProtocol) {
+ consumer = setUpConsumerWithRecordsToPoll(groupProtocol, tp0, 5);
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO);
@@ -266,14 +264,17 @@ public class KafkaConsumerTest {
assertEquals(records.records(tp0).size(), 5);
}
- @Test
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
@SuppressWarnings("unchecked")
- public void testSecondPollWithDeserializationErrorThrowsRecordDeserializationException() {
+ public void testSecondPollWithDeserializationErrorThrowsRecordDeserializationException(GroupProtocol groupProtocol) {
int invalidRecordNumber = 4;
int invalidRecordOffset = 3;
StringDeserializer deserializer = mockErrorDeserializer(invalidRecordNumber);
- consumer = setUpConsumerWithRecordsToPoll(tp0, 5, deserializer);
+ consumer = setUpConsumerWithRecordsToPoll(groupProtocol, tp0, 5, deserializer);
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO);
assertEquals(invalidRecordNumber - 1, records.count());
@@ -317,18 +318,23 @@ public class KafkaConsumerTest {
};
}
- private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount) {
- return setUpConsumerWithRecordsToPoll(tp, recordCount, new StringDeserializer());
+ private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(GroupProtocol groupProtocol,
+ TopicPartition tp,
+ int recordCount) {
+ return setUpConsumerWithRecordsToPoll(groupProtocol, tp, recordCount, new StringDeserializer());
}
- private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount, Deserializer<String> deserializer) {
+ private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(GroupProtocol groupProtocol,
+ TopicPartition tp,
+ int recordCount,
+ Deserializer<String> deserializer) {
Cluster cluster = TestUtils.singletonCluster(tp.topic(), 1);
Node node = cluster.nodes().get(0);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- consumer = newConsumer(time, client, subscription, metadata, assignor,
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, Optional.of(deserializer), false);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp), null);
@@ -337,9 +343,11 @@ public class KafkaConsumerTest {
return consumer;
}
- @Test
- public void testConstructorClose() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testConstructorClose(GroupProtocol groupProtocol) {
Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "invalid-23-8409-adsfsdj");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
@@ -347,7 +355,7 @@ public class KafkaConsumerTest {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
try {
- new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
fail("should have caught an exception and returned");
} catch (KafkaException e) {
assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
@@ -356,44 +364,53 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void testOsDefaultSocketBufferSizes() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testOsDefaultSocketBufferSizes(GroupProtocol groupProtocol) {
Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
- consumer = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ consumer = newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
- @Test
- public void testInvalidSocketSendBufferSize() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testInvalidSocketSendBufferSize(GroupProtocol groupProtocol) {
Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
config.put(ConsumerConfig.SEND_BUFFER_CONFIG, -2);
assertThrows(KafkaException.class,
- () -> new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+ () -> newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
}
- @Test
- public void testInvalidSocketReceiveBufferSize() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testInvalidSocketReceiveBufferSize(GroupProtocol groupProtocol) {
Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, -2);
assertThrows(KafkaException.class,
- () -> new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+ () -> newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()));
}
- @Test
- public void shouldIgnoreGroupInstanceIdForEmptyGroupId() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void shouldIgnoreGroupInstanceIdForEmptyGroupId(GroupProtocol groupProtocol) {
Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance_id");
- consumer = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ consumer = newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
- @Test
- public void testSubscription() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscription(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
consumer.subscribe(singletonList(topic));
assertEquals(singleton(topic), consumer.subscription());
@@ -412,93 +429,107 @@ public class KafkaConsumerTest {
assertTrue(consumer.assignment().isEmpty());
}
- @Test
- public void testSubscriptionOnNullTopicCollection() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscriptionOnNullTopicCollection(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((List<String>) null));
}
- @Test
- public void testSubscriptionOnNullTopic() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscriptionOnNullTopic(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(null)));
}
- @Test
- public void testSubscriptionOnEmptyTopic() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscriptionOnEmptyTopic(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
String emptyTopic = " ";
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic)));
}
- @Test
- public void testSubscriptionOnNullPattern() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscriptionOnNullPattern(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
assertThrows(IllegalArgumentException.class,
() -> consumer.subscribe((Pattern) null));
}
- @Test
- public void testSubscriptionOnEmptyPattern() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
assertThrows(IllegalArgumentException.class,
() -> consumer.subscribe(Pattern.compile("")));
}
- @Test
- public void testSubscriptionWithEmptyPartitionAssignment() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) {
Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- consumer = newConsumer(props);
+ consumer = newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
assertThrows(IllegalStateException.class,
() -> consumer.subscribe(singletonList(topic)));
}
- @Test
- public void testSeekNegative() {
- consumer = newConsumer((String) null);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testSeekNegative(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, null);
consumer.assign(singleton(new TopicPartition("nonExistTopic", 0)));
assertThrows(IllegalArgumentException.class,
() -> consumer.seek(new TopicPartition("nonExistTopic", 0), -1));
}
- @Test
- public void testAssignOnNullTopicPartition() {
- consumer = newConsumer((String) null);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testAssignOnNullTopicPartition(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, null);
assertThrows(IllegalArgumentException.class, () -> consumer.assign(null));
}
- @Test
- public void testAssignOnEmptyTopicPartition() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testAssignOnEmptyTopicPartition(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
consumer.assign(Collections.emptyList());
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());
}
- @Test
- public void testAssignOnNullTopicInPartition() {
- consumer = newConsumer((String) null);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testAssignOnNullTopicInPartition(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, null);
assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0))));
}
- @Test
- public void testAssignOnEmptyTopicInPartition() {
- consumer = newConsumer((String) null);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testAssignOnEmptyTopicInPartition(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, null);
assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(" ", 0))));
}
- @Test
- public void testInterceptorConstructorClose() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testInterceptorConstructorClose(GroupProtocol groupProtocol) {
try {
Properties props = new Properties();
// test with client ID assigned by KafkaConsumer
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName());
- consumer = new KafkaConsumer<>(
+ consumer = newConsumer(
props, new StringDeserializer(), new StringDeserializer());
assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
@@ -515,12 +546,14 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol groupProtocol) {
final int targetInterceptor = 3;
try {
Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName() + ", "
+ MockConsumerInterceptor.class.getName() + ", "
@@ -528,10 +561,8 @@ public class KafkaConsumerTest {
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
- assertThrows(KafkaException.class, () -> {
- new KafkaConsumer<>(
- props, new StringDeserializer(), new StringDeserializer());
- });
+ assertThrows(KafkaException.class, () -> newConsumer(
+ props, new StringDeserializer(), new StringDeserializer()));
assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
@@ -541,9 +572,10 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void testPause() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testPause(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
consumer.assign(singletonList(tp0));
assertEquals(singleton(tp0), consumer.assignment());
@@ -559,27 +591,32 @@ public class KafkaConsumerTest {
assertTrue(consumer.paused().isEmpty());
}
- @Test
- public void testConsumerJmxPrefix() throws Exception {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testConsumerJmxPrefix(GroupProtocol groupProtocol) throws Exception {
Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
config.put("client.id", "client-1");
- consumer = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ consumer = newConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
- MetricName testMetricName = consumer.metrics.metricName("test-metric",
+ MetricName testMetricName = consumer.metricsRegistry().metricName("test-metric",
"grp1", "test metric");
- consumer.metrics.addMetric(testMetricName, new Avg());
+ consumer.metricsRegistry().addMetric(testMetricName, new Avg());
assertNotNull(server.getObjectInstance(new ObjectName("kafka.consumer:type=grp1,client-id=client-1")));
}
- private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
- return newConsumer(groupId, Optional.empty());
+ private KafkaConsumer<byte[], byte[]> newConsumer(GroupProtocol groupProtocol, String groupId) {
+ return newConsumer(groupProtocol, groupId, Optional.empty());
}
- private KafkaConsumer<byte[], byte[]> newConsumer(String groupId, Optional<Boolean> enableAutoCommit) {
+ private KafkaConsumer<byte[], byte[]> newConsumer(GroupProtocol groupProtocol,
+ String groupId,
+ Optional<Boolean> enableAutoCommit) {
Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
@@ -587,22 +624,38 @@ public class KafkaConsumerTest {
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
enableAutoCommit.ifPresent(
autoCommit -> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString()));
- return newConsumer(props);
+ return newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
+
+ private <K, V> KafkaConsumer<K, V> newConsumer(Properties props) {
+ return newConsumer(props, null, null);
+ }
+
+ private <K, V> KafkaConsumer<K, V> newConsumer(Map<String, Object> configs,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
+ return new KafkaConsumer<>(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
+ keyDeserializer, valueDeserializer);
}
- private KafkaConsumer<byte[], byte[]> newConsumer(Properties props) {
- return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ private <K, V> KafkaConsumer<K, V> newConsumer(Properties props,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
+ return newConsumer(propsToMap(props), keyDeserializer, valueDeserializer);
}
- @Test
- public void verifyHeartbeatSent() throws Exception {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws Exception {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -624,15 +677,18 @@ public class KafkaConsumerTest {
assertTrue(heartbeatReceived.get());
}
- @Test
- public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void verifyHeartbeatSentWhenFetchedDataReady(GroupProtocol groupProtocol) throws Exception {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -654,15 +710,16 @@ public class KafkaConsumerTest {
assertTrue(heartbeatReceived.get());
}
- @Test
- public void verifyPollTimesOutDuringMetadataUpdate() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void verifyPollTimesOutDuringMetadataUpdate(GroupProtocol groupProtocol) {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
// Since we would enable the heartbeat thread after received join-response which could
// send the sync-group on behalf of the consumer if it is enqueued, we may still complete
@@ -677,16 +734,19 @@ public class KafkaConsumerTest {
assertEquals(0, requests.stream().filter(request -> request.apiKey().equals(ApiKeys.FETCH)).count());
}
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
@SuppressWarnings("deprecation")
- @Test
- public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
+ public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate(GroupProtocol groupProtocol) {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -699,15 +759,16 @@ public class KafkaConsumerTest {
assertEquals(FetchRequest.Builder.class, aClass);
}
- @Test
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
- public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
+ public void verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, null, groupInstanceId, false);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, null, groupInstanceId, false);
consumer.assign(singleton(tp0));
consumer.seekToBeginning(singleton(tp0));
@@ -721,8 +782,11 @@ public class KafkaConsumerTest {
assertEquals(55L, consumer.position(tp0));
}
- @Test
- public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -730,7 +794,7 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
// create a consumer with groupID with manual assignment
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singleton(tp0));
// 1st coordinator error should cause coordinator unknown
@@ -757,8 +821,11 @@ public class KafkaConsumerTest {
assertEquals(55, consumer.committed(Collections.singleton(tp0), Duration.ZERO).get(tp0).offset());
}
- @Test
- public void testFetchProgressWithMissingPartitionPosition() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testFetchProgressWithMissingPartitionPosition(GroupProtocol groupProtocol) {
// Verifies that we can make progress on one partition while we are awaiting
// a reset on another partition.
@@ -767,7 +834,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumerNoAutoCommit(time, client, subscription, metadata);
+ consumer = newConsumerNoAutoCommit(groupProtocol, time, client, subscription, metadata);
consumer.assign(Arrays.asList(tp0, tp1));
consumer.seekToEnd(singleton(tp0));
consumer.seekToBeginning(singleton(tp1));
@@ -814,8 +881,11 @@ public class KafkaConsumerTest {
mockClient.updateMetadata(initialMetadata);
}
- @Test
- public void testMissingOffsetNoResetPolicy() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -823,7 +893,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor,
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
@@ -835,8 +905,11 @@ public class KafkaConsumerTest {
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
}
- @Test
- public void testResetToCommittedOffset() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testResetToCommittedOffset(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -844,7 +917,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor,
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
@@ -857,8 +930,11 @@ public class KafkaConsumerTest {
assertEquals(539L, consumer.position(tp0));
}
- @Test
- public void testResetUsingAutoResetPolicy() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -866,7 +942,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor,
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, false);
consumer.assign(singletonList(tp0));
@@ -881,15 +957,16 @@ public class KafkaConsumerTest {
assertEquals(50L, consumer.position(tp0));
}
- @Test
- public void testOffsetIsValidAfterSeek() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- consumer = newConsumer(time, client, subscription, metadata, assignor,
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, Optional.empty(), false);
consumer.assign(singletonList(tp0));
consumer.seek(tp0, 20L);
@@ -897,8 +974,11 @@ public class KafkaConsumerTest {
assertEquals(subscription.validPosition(tp0).offset, 20L);
}
- @Test
- public void testCommitsFetchedDuringAssign() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCommitsFetchedDuringAssign(GroupProtocol groupProtocol) {
long offset1 = 10000;
long offset2 = 20000;
@@ -908,7 +988,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
// lookup coordinator
@@ -933,22 +1013,31 @@ public class KafkaConsumerTest {
assertEquals(offset2, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
}
- @Test
- public void testFetchStableOffsetThrowInCommitted() {
- assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer().committed(Collections.singleton(tp0)));
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testFetchStableOffsetThrowInCommitted(GroupProtocol groupProtocol) {
+ assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).committed(Collections.singleton(tp0)));
}
- @Test
- public void testFetchStableOffsetThrowInPoll() {
- assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer().poll(Duration.ZERO));
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) {
+ assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).poll(Duration.ZERO));
}
- @Test
- public void testFetchStableOffsetThrowInPosition() {
- assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer().position(tp0));
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testFetchStableOffsetThrowInPosition(GroupProtocol groupProtocol) {
+ assertThrows(UnsupportedVersionException.class, () -> setupThrowableConsumer(groupProtocol).position(tp0));
}
- private KafkaConsumer<?, ?> setupThrowableConsumer() {
+ private KafkaConsumer<?, ?> setupThrowableConsumer(GroupProtocol groupProtocol) {
long offset1 = 10000;
ConsumerMetadata metadata = createMetadata(subscription);
@@ -960,7 +1049,7 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
consumer = newConsumer(
- time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true);
+ groupProtocol, time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -971,8 +1060,11 @@ public class KafkaConsumerTest {
return consumer;
}
- @Test
- public void testNoCommittedOffsets() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testNoCommittedOffsets(GroupProtocol groupProtocol) {
long offset1 = 10000;
ConsumerMetadata metadata = createMetadata(subscription);
@@ -981,7 +1073,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(Arrays.asList(tp0, tp1));
// lookup coordinator
@@ -996,15 +1088,18 @@ public class KafkaConsumerTest {
assertNull(committed.get(tp1));
}
- @Test
- public void testAutoCommitSentBeforePositionUpdate() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testAutoCommitSentBeforePositionUpdate(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1027,8 +1122,11 @@ public class KafkaConsumerTest {
assertTrue(commitReceived.get());
}
- @Test
- public void testRegexSubscription() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testRegexSubscription(GroupProtocol groupProtocol) {
String unmatchedTopic = "unmatched";
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1040,7 +1138,7 @@ public class KafkaConsumerTest {
initMetadata(client, partitionCounts);
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -1053,8 +1151,11 @@ public class KafkaConsumerTest {
assertEquals(singleton(tp0), consumer.assignment());
}
- @Test
- public void testChangingRegexSubscription() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testChangingRegexSubscription(GroupProtocol groupProtocol) {
String otherTopic = "other";
TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
@@ -1068,7 +1169,7 @@ public class KafkaConsumerTest {
initMetadata(client, partitionCounts);
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
@@ -1087,15 +1188,18 @@ public class KafkaConsumerTest {
assertEquals(singleton(otherTopic), consumer.subscription());
}
- @Test
- public void testWakeupWithFetchDataAvailable() throws Exception {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) throws Exception {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1125,15 +1229,18 @@ public class KafkaConsumerTest {
exec.awaitTermination(5L, TimeUnit.SECONDS);
}
- @Test
- public void testPollThrowsInterruptExceptionIfInterrupted() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupProtocol) {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1150,15 +1257,16 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void fetchResponseWithUnexpectedPartitionIsIgnored() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1182,9 +1290,12 @@ public class KafkaConsumerTest {
* Upon unsubscribing from subscribed topics the consumer subscription and assignment
* are both updated right away but its consumed offsets are not auto committed.
*/
- @Test
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
@SuppressWarnings("unchecked")
- public void testSubscriptionChangesWithAutoCommitEnabled() {
+ public void testSubscriptionChangesWithAutoCommitEnabled(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1197,7 +1308,7 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
// initial subscription
consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
@@ -1295,8 +1406,11 @@ public class KafkaConsumerTest {
* Upon unsubscribing from subscribed topics, the assigned partitions immediately
* change but if auto-commit is disabled the consumer offsets are not committed.
*/
- @Test
- public void testSubscriptionChangesWithAutoCommitDisabled() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1308,7 +1422,7 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
initializeSubscriptionWithSingleTopic(consumer, getConsumerRebalanceListener(consumer));
@@ -1349,8 +1463,11 @@ public class KafkaConsumerTest {
client.requests().clear();
}
- @Test
- public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1358,7 +1475,7 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
@@ -1372,8 +1489,11 @@ public class KafkaConsumerTest {
assertEquals(partitionRevoked + singleTopicPartition, unsubscribeException.getCause().getMessage());
}
- @Test
- public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration(GroupProtocol groupProtocol) throws Exception {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1381,7 +1501,7 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1407,9 +1527,12 @@ public class KafkaConsumerTest {
assertEquals(Collections.emptySet(), consumer.assignment());
}
- @Test
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
@SuppressWarnings("unchecked")
- public void testManualAssignmentChangeWithAutoCommitEnabled() {
+ public void testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1421,7 +1544,7 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1462,8 +1585,11 @@ public class KafkaConsumerTest {
client.requests().clear();
}
- @Test
- public void testManualAssignmentChangeWithAutoCommitDisabled() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testManualAssignmentChangeWithAutoCommitDisabled(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1475,7 +1601,7 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1517,8 +1643,11 @@ public class KafkaConsumerTest {
client.requests().clear();
}
- @Test
- public void testOffsetOfPausedPartitions() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testOffsetOfPausedPartitions(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1527,7 +1656,7 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
// lookup coordinator
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
@@ -1567,38 +1696,47 @@ public class KafkaConsumerTest {
consumer.unsubscribe();
}
- @Test
- public void testPollWithNoSubscription() {
- consumer = newConsumer((String) null);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testPollWithNoSubscription(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, null);
assertThrows(IllegalStateException.class, () -> consumer.poll(Duration.ZERO));
}
- @Test
- public void testPollWithEmptySubscription() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testPollWithEmptySubscription(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
consumer.subscribe(Collections.emptyList());
assertThrows(IllegalStateException.class, () -> consumer.poll(Duration.ZERO));
}
- @Test
- public void testPollWithEmptyUserAssignment() {
- consumer = newConsumer(groupId);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testPollWithEmptyUserAssignment(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, groupId);
consumer.assign(Collections.emptySet());
assertThrows(IllegalStateException.class, () -> consumer.poll(Duration.ZERO));
}
- @Test
- public void testGracefulClose() throws Exception {
+ // TODO: this test references RPCs to be sent that are not part of the CONSUMER group protocol.
+ // We are deferring any attempts at generalizing this test for both group protocols to the future.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testGracefulClose(GroupProtocol groupProtocol) throws Exception {
Map<TopicPartition, Errors> response = new HashMap<>();
response.put(tp0, Errors.NONE);
OffsetCommitResponse commitResponse = offsetCommitResponse(response);
LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()));
FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>());
- consumerCloseTest(5000, Arrays.asList(commitResponse, leaveGroupResponse, closeResponse), 0, false);
+ consumerCloseTest(groupProtocol, 5000, Arrays.asList(commitResponse, leaveGroupResponse, closeResponse), 0, false);
}
- @Test
- public void testCloseTimeoutDueToNoResponseForCloseFetchRequest() throws Exception {
+ // TODO: this test references RPCs to be sent that are not part of the CONSUMER group protocol.
+ // We are deferring any attempts at generalizing this test for both group protocols to the future.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCloseTimeoutDueToNoResponseForCloseFetchRequest(GroupProtocol groupProtocol) throws Exception {
Map<TopicPartition, Errors> response = new HashMap<>();
response.put(tp0, Errors.NONE);
OffsetCommitResponse commitResponse = offsetCommitResponse(response);
@@ -1610,39 +1748,54 @@ public class KafkaConsumerTest {
// than configured timeout.
final int closeTimeoutMs = 5000;
final int waitForCloseCompletionMs = closeTimeoutMs + 1000;
- consumerCloseTest(closeTimeoutMs, serverResponsesWithoutCloseResponse, waitForCloseCompletionMs, false);
+ consumerCloseTest(groupProtocol, closeTimeoutMs, serverResponsesWithoutCloseResponse, waitForCloseCompletionMs, false);
}
- @Test
- public void testCloseTimeout() throws Exception {
- consumerCloseTest(5000, Collections.emptyList(), 5000, false);
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCloseTimeout(GroupProtocol groupProtocol) throws Exception {
+ consumerCloseTest(groupProtocol, 5000, Collections.emptyList(), 5000, false);
}
- @Test
- public void testLeaveGroupTimeout() throws Exception {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testLeaveGroupTimeout(GroupProtocol groupProtocol) throws Exception {
Map<TopicPartition, Errors> response = new HashMap<>();
response.put(tp0, Errors.NONE);
OffsetCommitResponse commitResponse = offsetCommitResponse(response);
- consumerCloseTest(5000, singletonList(commitResponse), 5000, false);
+ consumerCloseTest(groupProtocol, 5000, singletonList(commitResponse), 5000, false);
}
- @Test
- public void testCloseNoWait() throws Exception {
- consumerCloseTest(0, Collections.emptyList(), 0, false);
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCloseNoWait(GroupProtocol groupProtocol) throws Exception {
+ consumerCloseTest(groupProtocol, 0, Collections.emptyList(), 0, false);
}
- @Test
- public void testCloseInterrupt() throws Exception {
- consumerCloseTest(Long.MAX_VALUE, Collections.emptyList(), 0, true);
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCloseInterrupt(GroupProtocol groupProtocol) throws Exception {
+ consumerCloseTest(groupProtocol, Long.MAX_VALUE, Collections.emptyList(), 0, true);
}
- @Test
- public void testCloseShouldBeIdempotent() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCloseShouldBeIdempotent(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = spy(new MockClient(time, metadata));
initMetadata(client, singletonMap(topic, 1));
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.close(Duration.ZERO);
consumer.close(Duration.ZERO);
@@ -1651,47 +1804,49 @@ public class KafkaConsumerTest {
verify(client).close();
}
- @Test
- public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testOperationsBySubscribingConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
try {
- newConsumer(null, Optional.of(Boolean.TRUE));
+ newConsumer(groupProtocol, null, Optional.of(Boolean.TRUE));
fail("Expected an InvalidConfigurationException");
- } catch (KafkaException e) {
- assertEquals(InvalidConfigurationException.class, e.getCause().getClass());
+ } catch (InvalidConfigurationException e) {
+ // OK, expected
}
try {
- newConsumer((String) null).subscribe(Collections.singleton(topic));
+ newConsumer(groupProtocol, null).subscribe(Collections.singleton(topic));
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
- newConsumer((String) null).committed(Collections.singleton(tp0)).get(tp0);
+ newConsumer(groupProtocol, null).committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
- newConsumer((String) null).commitAsync();
+ newConsumer(groupProtocol, null).commitAsync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
- newConsumer((String) null).commitSync();
+ newConsumer(groupProtocol, null).commitSync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
}
- @Test
- public void testOperationsByAssigningConsumerWithDefaultGroupId() {
- KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testOperationsByAssigningConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupProtocol, null);
consumer.assign(singleton(tp0));
try {
@@ -1716,30 +1871,35 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void testMetricConfigRecordingLevelInfo() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testMetricConfigRecordingLevelInfo(GroupProtocol groupProtocol) {
Properties props = new Properties();
+ props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
- KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
- assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel());
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ assertEquals(Sensor.RecordingLevel.INFO, consumer.metricsRegistry().config().recordLevel());
consumer.close(Duration.ZERO);
props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
- KafkaConsumer<byte[], byte[]> consumer2 = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
- assertEquals(Sensor.RecordingLevel.DEBUG, consumer2.metrics.config().recordLevel());
+ KafkaConsumer<byte[], byte[]> consumer2 = newConsumer(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ assertEquals(Sensor.RecordingLevel.DEBUG, consumer2.metricsRegistry().config().recordLevel());
consumer2.close(Duration.ZERO);
}
- @Test
+ // TODO: this test references RPCs to be sent that are not part of the CONSUMER group protocol.
+ // We are deferring any attempts at generalizing this test for both group protocols to the future.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
@SuppressWarnings("unchecked")
- public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
+ public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed(GroupProtocol groupProtocol) throws Exception {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@@ -1797,7 +1957,8 @@ public class KafkaConsumerTest {
assertFalse(records.isEmpty());
}
- private void consumerCloseTest(final long closeTimeoutMs,
+ private void consumerCloseTest(GroupProtocol groupProtocol,
+ final long closeTimeoutMs,
List<? extends AbstractResponse> responses,
long waitMs,
boolean interrupt) throws Exception {
@@ -1807,7 +1968,7 @@ public class KafkaConsumerTest {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty());
+ final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, Optional.empty());
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
@@ -1893,8 +2054,11 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void testPartitionsForNonExistingTopic() {
+ // TODO: this test requires topic metadata logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testPartitionsForNonExistingTopic(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -1907,59 +2071,83 @@ public class KafkaConsumerTest {
Collections.emptyList());
client.prepareResponse(updateResponse);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic"));
}
- @Test
- public void testPartitionsForAuthenticationFailure() {
- final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+ // TODO: this test requires topic metadata logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testPartitionsForAuthenticationFailure(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
assertThrows(AuthenticationException.class, () -> consumer.partitionsFor("some other topic"));
}
- @Test
- public void testBeginningOffsetsAuthenticationFailure() {
- final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testBeginningOffsetsAuthenticationFailure(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
assertThrows(AuthenticationException.class, () -> consumer.beginningOffsets(Collections.singleton(tp0)));
}
- @Test
- public void testEndOffsetsAuthenticationFailure() {
- final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testEndOffsetsAuthenticationFailure(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
assertThrows(AuthenticationException.class, () -> consumer.endOffsets(Collections.singleton(tp0)));
}
- @Test
- public void testPollAuthenticationFailure() {
- final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testPollAuthenticationFailure(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
consumer.subscribe(singleton(topic));
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
}
- @Test
- public void testOffsetsForTimesAuthenticationFailure() {
- final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testOffsetsForTimesAuthenticationFailure(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
assertThrows(AuthenticationException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L)));
}
- @Test
- public void testCommitSyncAuthenticationFailure() {
- final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCommitSyncAuthenticationFailure(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp0, new OffsetAndMetadata(10L));
assertThrows(AuthenticationException.class, () -> consumer.commitSync(offsets));
}
- @Test
- public void testCommittedAuthenticationFailure() {
- final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testCommittedAuthenticationFailure(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
assertThrows(AuthenticationException.class, () -> consumer.committed(Collections.singleton(tp0)).get(tp0));
}
- @Test
- public void testMeasureCommitSyncDurationOnFailure() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testMeasureCommitSyncDurationOnFailure(GroupProtocol groupProtocol) {
final KafkaConsumer<String, String> consumer
- = consumerWithPendingError(new MockTime(Duration.ofSeconds(1).toMillis()));
+ = consumerWithPendingError(groupProtocol, new MockTime(Duration.ofSeconds(1).toMillis()));
try {
consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(10L)));
@@ -1967,12 +2155,15 @@ public class KafkaConsumerTest {
}
final Metric metric = consumer.metrics()
- .get(consumer.metrics.metricName("commit-sync-time-ns-total", "consumer-metrics"));
+ .get(consumer.metricsRegistry().metricName("commit-sync-time-ns-total", "consumer-metrics"));
assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
}
- @Test
- public void testMeasureCommitSyncDuration() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testMeasureCommitSyncDuration(GroupProtocol groupProtocol) {
Time time = new MockTime(Duration.ofSeconds(1).toMillis());
SubscriptionState subscription = new SubscriptionState(new LogContext(),
OffsetResetStrategy.EARLIEST);
@@ -1980,7 +2171,7 @@ public class KafkaConsumerTest {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
@@ -1995,14 +2186,17 @@ public class KafkaConsumerTest {
consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(10L)));
final Metric metric = consumer.metrics()
- .get(consumer.metrics.metricName("commit-sync-time-ns-total", "consumer-metrics"));
+ .get(consumer.metricsRegistry().metricName("commit-sync-time-ns-total", "consumer-metrics"));
assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
}
- @Test
- public void testMeasureCommittedDurationOnFailure() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testMeasureCommittedDurationOnFailure(GroupProtocol groupProtocol) {
final KafkaConsumer<String, String> consumer
- = consumerWithPendingError(new MockTime(Duration.ofSeconds(1).toMillis()));
+ = consumerWithPendingError(groupProtocol, new MockTime(Duration.ofSeconds(1).toMillis()));
try {
consumer.committed(Collections.singleton(tp0));
@@ -2010,12 +2204,15 @@ public class KafkaConsumerTest {
}
final Metric metric = consumer.metrics()
- .get(consumer.metrics.metricName("committed-time-ns-total", "consumer-metrics"));
+ .get(consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"));
assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
}
- @Test
- public void testMeasureCommittedDuration() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testMeasureCommittedDuration(GroupProtocol groupProtocol) {
long offset1 = 10000;
Time time = new MockTime(Duration.ofSeconds(1).toMillis());
SubscriptionState subscription = new SubscriptionState(new LogContext(),
@@ -2024,7 +2221,7 @@ public class KafkaConsumerTest {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));
@@ -2040,19 +2237,22 @@ public class KafkaConsumerTest {
consumer.committed(Collections.singleton(tp0)).get(tp0).offset();
final Metric metric = consumer.metrics()
- .get(consumer.metrics.metricName("committed-time-ns-total", "consumer-metrics"));
+ .get(consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"));
assertTrue((Double) metric.metricValue() >= Duration.ofMillis(999).toNanos());
}
- @Test
- public void testRebalanceException() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testRebalanceException(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener());
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
@@ -2086,13 +2286,16 @@ public class KafkaConsumerTest {
assertTrue(subscription.assignedPartitions().isEmpty());
}
- @Test
- public void testReturnRecordsDuringRebalance() throws InterruptedException {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws InterruptedException {
Time time = new MockTime(1L);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
@@ -2211,15 +2414,18 @@ public class KafkaConsumerTest {
consumer.close(Duration.ZERO);
}
- @Test
- public void testGetGroupMetadata() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testGetGroupMetadata(GroupProtocol groupProtocol) {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
final Node node = metadata.fetch().nodes().get(0);
- final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
final ConsumerGroupMetadata groupMetadataOnStart = consumer.groupMetadata();
assertEquals(groupId, groupMetadataOnStart.groupId());
@@ -2241,12 +2447,15 @@ public class KafkaConsumerTest {
assertEquals(groupInstanceId, groupMetadataAfterPoll.groupInstanceId());
}
- @Test
- public void testInvalidGroupMetadata() throws InterruptedException {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testInvalidGroupMetadata(GroupProtocol groupProtocol) throws InterruptedException {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
new RoundRobinAssignor(), true, groupInstanceId);
consumer.subscribe(singletonList(topic));
// concurrent access is illegal
@@ -2268,15 +2477,18 @@ public class KafkaConsumerTest {
assertThrows(IllegalStateException.class, consumer::groupMetadata);
}
- @Test
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
@SuppressWarnings("unchecked")
- public void testCurrentLag() {
+ public void testCurrentLag(GroupProtocol groupProtocol) {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, singletonMap(topic, 1));
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
// throws for unassigned partition
assertThrows(IllegalStateException.class, () -> consumer.currentLag(tp0));
@@ -2321,14 +2533,17 @@ public class KafkaConsumerTest {
assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
}
- @Test
- public void testListOffsetShouldUpdateSubscriptions() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
initMetadata(client, singletonMap(topic, 1));
- consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singleton(tp0));
@@ -2344,7 +2559,8 @@ public class KafkaConsumerTest {
assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
}
- private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(final Time time) {
+ private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol,
+ final Time time) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -2354,15 +2570,15 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
client.createPendingAuthenticationError(node, 0);
- return newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ return newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
}
- private KafkaConsumer<String, String> consumerWithPendingAuthenticationError() {
- return consumerWithPendingAuthenticationError(new MockTime());
+ private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol) {
+ return consumerWithPendingAuthenticationError(groupProtocol, new MockTime());
}
- private KafkaConsumer<String, String> consumerWithPendingError(final Time time) {
- return consumerWithPendingAuthenticationError(time);
+ private KafkaConsumer<String, String> consumerWithPendingError(GroupProtocol groupProtocol, final Time time) {
+ return consumerWithPendingAuthenticationError(groupProtocol, time);
}
private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<?, ?> consumer) {
@@ -2512,6 +2728,7 @@ public class KafkaConsumerTest {
partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(),
Optional.empty(), "", error));
}
+ int throttleMs = 10;
return new OffsetFetchResponse(
throttleMs,
Collections.singletonMap(groupId, Errors.NONE),
@@ -2561,11 +2778,12 @@ public class KafkaConsumerTest {
if (fetchCount == 0) {
records = MemoryRecords.EMPTY;
} else {
- MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
- TimestampType.CREATE_TIME, fetchOffset);
- for (int i = 0; i < fetchCount; i++)
- builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
- records = builder.build();
+ try (MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
+ TimestampType.CREATE_TIME, fetchOffset)) {
+ for (int i = 0; i < fetchCount; i++)
+ builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
+ records = builder.build();
+ }
}
tpResponses.put(new TopicIdPartition(topicIds.get(partition.topic()), partition),
new FetchResponseData.PartitionData()
@@ -2582,24 +2800,49 @@ public class KafkaConsumerTest {
return fetchResponse(Collections.singletonMap(partition, fetchInfo));
}
- private KafkaConsumer<String, String> newConsumer(Time time,
+ private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol,
+ Time time,
KafkaClient client,
SubscriptionState subscription,
ConsumerMetadata metadata,
ConsumerPartitionAssignor assignor,
boolean autoCommitEnabled,
Optional<String> groupInstanceId) {
- return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId, false);
+ return newConsumer(
+ groupProtocol,
+ time,
+ client,
+ subscription,
+ metadata,
+ assignor,
+ autoCommitEnabled,
+ groupId,
+ groupInstanceId,
+ false
+ );
}
- private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
+ private KafkaConsumer<String, String> newConsumerNoAutoCommit(GroupProtocol groupProtocol,
+ Time time,
KafkaClient client,
SubscriptionState subscription,
ConsumerMetadata metadata) {
- return newConsumer(time, client, subscription, metadata, new RangeAssignor(), false, groupId, groupInstanceId, false);
+ return newConsumer(
+ groupProtocol,
+ time,
+ client,
+ subscription,
+ metadata,
+ new RangeAssignor(),
+ false,
+ groupId,
+ groupInstanceId,
+ false
+ );
}
- private KafkaConsumer<String, String> newConsumer(Time time,
+ private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol,
+ Time time,
KafkaClient client,
SubscriptionState subscription,
ConsumerMetadata metadata,
@@ -2608,22 +2851,64 @@ public class KafkaConsumerTest {
String groupId,
Optional<String> groupInstanceId,
boolean throwOnStableOffsetNotSupported) {
- return newConsumer(time, client, subscription, metadata, assignor, autoCommitEnabled, groupId, groupInstanceId,
- Optional.of(new StringDeserializer()), throwOnStableOffsetNotSupported);
+ return newConsumer(
+ groupProtocol,
+ time,
+ client,
+ subscription,
+ metadata,
+ assignor,
+ autoCommitEnabled,
+ groupId,
+ groupInstanceId,
+ Optional.of(new StringDeserializer()),
+ throwOnStableOffsetNotSupported
+ );
}
- private KafkaConsumer<String, String> newConsumer(Time time,
+ private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol,
+ Time time,
KafkaClient client,
- SubscriptionState subscription,
+ SubscriptionState subscriptions,
ConsumerMetadata metadata,
ConsumerPartitionAssignor assignor,
boolean autoCommitEnabled,
String groupId,
Optional<String> groupInstanceId,
- Optional<Deserializer<String>> valueDeserializer,
+ Optional<Deserializer<String>> valueDeserializerOpt,
boolean throwOnStableOffsetNotSupported) {
+ Deserializer<String> keyDeserializer = new StringDeserializer();
+ Deserializer<String> valueDeserializer = valueDeserializerOpt.orElse(new StringDeserializer());
+ LogContext logContext = new LogContext();
+ List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
+ ConsumerConfig config = newConsumerConfig(
+ groupProtocol,
+ autoCommitEnabled,
+ groupId,
+ groupInstanceId,
+ valueDeserializer,
+ throwOnStableOffsetNotSupported
+ );
+ return new KafkaConsumer<>(
+ logContext,
+ time,
+ config,
+ keyDeserializer,
+ valueDeserializer,
+ client,
+ subscriptions,
+ metadata,
+ assignors
+ );
+ }
+
+ private ConsumerConfig newConsumerConfig(GroupProtocol groupProtocol,
+ boolean autoCommitEnabled,
+ String groupId,
+ Optional<String> groupInstanceId,
+ Deserializer<String> valueDeserializer,
+ boolean throwOnStableOffsetNotSupported) {
String clientId = "mock-consumer";
- String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
long retryBackoffMaxMs = 1000;
int minBytes = 1;
@@ -2633,101 +2918,35 @@ public class KafkaConsumerTest {
int maxPollRecords = Integer.MAX_VALUE;
boolean checkCrcs = true;
int rebalanceTimeoutMs = 60000;
+ int requestTimeoutMs = defaultApiTimeoutMs / 2;
- Deserializer<String> keyDeserializer = new StringDeserializer();
- Deserializer<String> deserializer = valueDeserializer.orElse(new StringDeserializer());
-
- List<ConsumerPartitionAssignor> assignors = singletonList(assignor);
- ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.emptyList());
-
- Metrics metrics = new Metrics(time);
- ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
-
- LogContext loggerFactory = new LogContext();
- ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
- retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
-
- ConsumerCoordinator consumerCoordinator = null;
- if (groupId != null) {
- GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
- rebalanceTimeoutMs,
- heartbeatIntervalMs,
- groupId,
- groupInstanceId,
- retryBackoffMs,
- retryBackoffMaxMs,
- true);
- consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
- loggerFactory,
- consumerClient,
- assignors,
- metadata,
- subscription,
- metrics,
- metricGroupPrefix,
- time,
- autoCommitEnabled,
- autoCommitIntervalMs,
- interceptors,
- throwOnStableOffsetNotSupported,
- null);
- }
- IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
- FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
- FetchConfig fetchConfig = new FetchConfig(
- minBytes,
- maxBytes,
- maxWaitMs,
- fetchSize,
- maxPollRecords,
- checkCrcs,
- CommonClientConfigs.DEFAULT_CLIENT_RACK,
- isolationLevel);
- Fetcher<String, String> fetcher = new Fetcher<>(
- loggerFactory,
- consumerClient,
- metadata,
- subscription,
- fetchConfig,
- new Deserializers<>(keyDeserializer, deserializer),
- metricsManager,
- time,
- new ApiVersions());
- OffsetFetcher offsetFetcher = new OffsetFetcher(loggerFactory,
- consumerClient,
- metadata,
- subscription,
- time,
- retryBackoffMs,
- requestTimeoutMs,
- isolationLevel,
- new ApiVersions());
- TopicMetadataFetcher topicMetadataFetcher = new TopicMetadataFetcher(loggerFactory,
- consumerClient,
- retryBackoffMs,
- retryBackoffMaxMs);
-
- return new KafkaConsumer<>(
- loggerFactory,
- clientId,
- consumerCoordinator,
- keyDeserializer,
- deserializer,
- fetcher,
- offsetFetcher,
- topicMetadataFetcher,
- interceptors,
- time,
- consumerClient,
- metrics,
- subscription,
- metadata,
- retryBackoffMs,
- retryBackoffMaxMs,
- requestTimeoutMs,
- defaultApiTimeoutMs,
- assignors,
- groupId);
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
+ configs.put(ConsumerConfig.CHECK_CRCS_CONFIG, checkCrcs);
+ configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ configs.put(ConsumerConfig.CLIENT_RACK_CONFIG, CommonClientConfigs.DEFAULT_CLIENT_RACK);
+ configs.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeoutMs);
+ configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitEnabled);
+ configs.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, maxBytes);
+ configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, maxWaitMs);
+ configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minBytes);
+ configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
+ configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
+ configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize);
+ configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, rebalanceTimeoutMs);
+ configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
+ configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
+ configs.put(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs);
+ configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
+ configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
+ configs.put(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, throwOnStableOffsetNotSupported);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
+ groupInstanceId.ifPresent(gi -> configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, gi));
+
+ return new ConsumerConfig(configs);
}
private static class FetchInfo {
@@ -2748,8 +2967,11 @@ public class KafkaConsumerTest {
}
}
- @Test
- public void testSubscriptionOnInvalidTopic() {
+ // TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
+ // Once it is implemented, this should use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -2767,22 +2989,23 @@ public class KafkaConsumerTest {
topicMetadata);
client.prepareMetadataUpdate(updateResponse);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO));
}
- @Test
- public void testPollTimeMetrics() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testPollTimeMetrics(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic));
// MetricName objects to check
- Metrics metrics = consumer.metrics;
+ Metrics metrics = consumer.metricsRegistry();
MetricName lastPollSecondsAgoName = metrics.metricName("last-poll-seconds-ago", "consumer-metrics");
MetricName timeBetweenPollAvgName = metrics.metricName("time-between-poll-avg", "consumer-metrics");
MetricName timeBetweenPollMaxName = metrics.metricName("time-between-poll-max", "consumer-metrics");
@@ -2818,32 +3041,33 @@ public class KafkaConsumerTest {
assertEquals(10 * 1000d, consumer.metrics().get(timeBetweenPollMaxName).metricValue());
}
- @Test
- public void testPollIdleRatio() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+public void testPollIdleRatio(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
// MetricName object to check
- Metrics metrics = consumer.metrics;
+ Metrics metrics = consumer.metricsRegistry();
MetricName pollIdleRatio = metrics.metricName("poll-idle-ratio-avg", "consumer-metrics");
// Test default value
assertEquals(Double.NaN, consumer.metrics().get(pollIdleRatio).metricValue());
// 1st poll
// Spend 50ms in poll so value = 1.0
- consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
+ consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
time.sleep(50);
- consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+ consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
assertEquals(1.0d, consumer.metrics().get(pollIdleRatio).metricValue());
// 2nd poll
// Spend 50m outside poll and 0ms in poll so value = 0.0
time.sleep(50);
- consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
- consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+ consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
+ consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
// Avg of first two data points
assertEquals((1.0d + 0.0d) / 2, consumer.metrics().get(pollIdleRatio).metricValue());
@@ -2851,9 +3075,9 @@ public class KafkaConsumerTest {
// 3rd poll
// Spend 25ms outside poll and 25ms in poll so value = 0.5
time.sleep(25);
- consumer.kafkaConsumerMetrics.recordPollStart(time.milliseconds());
+ consumer.kafkaConsumerMetrics().recordPollStart(time.milliseconds());
time.sleep(25);
- consumer.kafkaConsumerMetrics.recordPollEnd(time.milliseconds());
+ consumer.kafkaConsumerMetrics().recordPollEnd(time.milliseconds());
// Avg of three data points
assertEquals((1.0d + 0.0d + 0.5d) / 3, consumer.metrics().get(pollIdleRatio).metricValue());
@@ -2861,16 +3085,17 @@ public class KafkaConsumerTest {
private static boolean consumerMetricPresent(KafkaConsumer<String, String> consumer, String name) {
MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap());
- return consumer.metrics.metrics().containsKey(metricName);
+ return consumer.metricsRegistry().metrics().containsKey(metricName);
}
- @Test
- public void testClosingConsumerUnregistersConsumerMetrics() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupProtocol) {
Time time = new MockTime(1L);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata,
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
new RoundRobinAssignor(), true, groupInstanceId);
consumer.subscribe(singletonList(topic));
assertTrue(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
@@ -2882,19 +3107,23 @@ public class KafkaConsumerTest {
assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
}
- @Test
- public void testEnforceRebalanceWithManualAssignment() {
- consumer = newConsumer((String) null);
+ // NOTE: this test uses the enforceRebalance API which is not implemented in the CONSUMER group protocol.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testEnforceRebalanceWithManualAssignment(GroupProtocol groupProtocol) {
+ consumer = newConsumer(groupProtocol, null);
consumer.assign(singleton(new TopicPartition("topic", 0)));
assertThrows(IllegalStateException.class, consumer::enforceRebalance);
}
- @Test
- public void testEnforceRebalanceTriggersRebalanceOnNextPoll() {
+ // NOTE: this test uses the enforceRebalance API which is not implemented in the CONSUMER group protocol.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testEnforceRebalanceTriggersRebalanceOnNextPoll(GroupProtocol groupProtocol) {
Time time = new MockTime(1L);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
@@ -2918,8 +3147,10 @@ public class KafkaConsumerTest {
assertEquals(countingRebalanceListener.revokedCount, 1);
}
- @Test
- public void testEnforceRebalanceReason() {
+ // NOTE: this test uses the enforceRebalance API which is not implemented in the CONSUMER group protocol.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testEnforceRebalanceReason(GroupProtocol groupProtocol) {
Time time = new MockTime(1L);
ConsumerMetadata metadata = createMetadata(subscription);
@@ -2928,6 +3159,7 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
consumer = newConsumer(
+ groupProtocol,
time,
client,
subscription,
@@ -2978,24 +3210,30 @@ public class KafkaConsumerTest {
);
}
- @Test
- public void configurableObjectsShouldSeeGeneratedClientId() {
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void configurableObjectsShouldSeeGeneratedClientId(GroupProtocol groupProtocol) {
Properties props = new Properties();
+ props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerForClientId.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorForClientId.class.getName());
- consumer = new KafkaConsumer<>(props);
- assertNotNull(consumer.getClientId());
- assertNotEquals(0, consumer.getClientId().length());
+ consumer = newConsumer(props);
+ assertNotNull(consumer.clientId());
+ assertNotEquals(0, consumer.clientId().length());
assertEquals(3, CLIENT_IDS.size());
- CLIENT_IDS.forEach(id -> assertEquals(id, consumer.getClientId()));
+ CLIENT_IDS.forEach(id -> assertEquals(id, consumer.clientId()));
}
- @Test
- public void testUnusedConfigs() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testUnusedConfigs(GroupProtocol groupProtocol) {
Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
ConsumerConfig config = new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(props, new StringDeserializer(), new StringDeserializer()));
@@ -3006,45 +3244,56 @@ public class KafkaConsumerTest {
assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
}
- @Test
- public void testAssignorNameConflict() {
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testAssignorNameConflict(GroupProtocol groupProtocol) {
Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Arrays.asList(RangeAssignor.class.getName(), ConsumerPartitionAssignorTest.TestConsumerPartitionAssignor.class.getName()));
assertThrows(KafkaException.class,
- () -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer()));
+ () -> newConsumer(configs, new StringDeserializer(), new StringDeserializer()));
}
- @Test
- public void testOffsetsForTimesTimeout() {
- final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testOffsetsForTimesTimeout(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException(groupProtocol);
assertEquals(
"Failed to get offsets by times in 60000ms",
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
);
}
- @Test
- public void testBeginningOffsetsTimeout() {
- final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testBeginningOffsetsTimeout(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException(groupProtocol);
assertEquals(
"Failed to get offsets by times in 60000ms",
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.beginningOffsets(singletonList(tp0))).getMessage()
);
}
- @Test
- public void testEndOffsetsTimeout() {
- final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+ // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
+ // The bug will be investigated and fixed so this test can use both group protocols.
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "GENERIC")
+ public void testEndOffsetsTimeout(GroupProtocol groupProtocol) {
+ final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException(groupProtocol);
assertEquals(
"Failed to get offsets by times in 60000ms",
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.endOffsets(singletonList(tp0))).getMessage()
);
}
- private KafkaConsumer<String, String> consumerForCheckingTimeoutException() {
+ private KafkaConsumer<String, String> consumerForCheckingTimeoutException(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@@ -3052,7 +3301,7 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new RangeAssignor();
- final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+ final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
for (int i = 0; i < 10; i++) {
client.prepareResponse(
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
similarity index 98%
rename from clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
rename to clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index d67f509d905..7f506ec8f9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -69,10 +69,10 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class PrototypeAsyncConsumerTest {
+public class AsyncKafkaConsumerTest {
- private PrototypeAsyncConsumer<?, ?> consumer;
- private ConsumerTestBuilder.PrototypeAsyncConsumerTestBuilder testBuilder;
+ private AsyncKafkaConsumer<?, ?> consumer;
+ private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
private ApplicationEventHandler applicationEventHandler;
@BeforeEach
@@ -82,7 +82,7 @@ public class PrototypeAsyncConsumerTest {
}
private void setup(Optional<ConsumerTestBuilder.GroupInformation> groupInfo) {
- testBuilder = new ConsumerTestBuilder.PrototypeAsyncConsumerTestBuilder(groupInfo);
+ testBuilder = new ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo);
applicationEventHandler = testBuilder.applicationEventHandler;
consumer = testBuilder.consumer;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 4751cdea09d..7e8dcbead7b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -57,6 +58,7 @@ import java.util.stream.Stream;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
@@ -497,6 +499,10 @@ public class CommitRequestManagerTest {
private CommitRequestManager create(final boolean autoCommitEnabled, final long autoCommitInterval) {
props.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(autoCommitInterval));
props.setProperty(ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(autoCommitEnabled));
+
+ if (autoCommitEnabled)
+ props.setProperty(GROUP_ID_CONFIG, TestUtils.randomString(10));
+
return spy(new CommitRequestManager(
this.time,
this.logContext,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index b36f1958f7d..03ab1893289 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -304,11 +304,11 @@ public class ConsumerTestBuilder implements Closeable {
}
}
- public static class PrototypeAsyncConsumerTestBuilder extends ApplicationEventHandlerTestBuilder {
+ public static class AsyncKafkaConsumerTestBuilder extends ApplicationEventHandlerTestBuilder {
- final PrototypeAsyncConsumer<String, String> consumer;
+ final AsyncKafkaConsumer<String, String> consumer;
- public PrototypeAsyncConsumerTestBuilder(Optional<GroupInformation> groupInfo) {
+ public AsyncKafkaConsumerTestBuilder(Optional<GroupInformation> groupInfo) {
super(groupInfo);
String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
List<ConsumerPartitionAssignor> assignors = ConsumerPartitionAssignor.getAssignorInstances(
@@ -323,7 +323,7 @@ public class ConsumerTestBuilder implements Closeable {
deserializers,
metricsManager,
time);
- this.consumer = spy(new PrototypeAsyncConsumer<>(
+ this.consumer = spy(new AsyncKafkaConsumer<>(
logContext,
clientId,
deserializers,
diff --git a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
index 6d09575a463..717f1179753 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala
@@ -17,10 +17,12 @@
package kafka.api
import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.consumer.{ConsumerConfig, GroupProtocol}
import org.junit.jupiter.api.Assertions.{assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.Test
import java.time.Duration
+import java.util.Properties
import scala.jdk.CollectionConverters._
class BaseAsyncConsumerTest extends AbstractConsumerTest {
@@ -28,7 +30,9 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
@Test
def testCommitAsync(): Unit = {
- val consumer = createAsyncConsumer()
+ val props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
+ val consumer = createConsumer(configOverrides = props)
val producer = createProducer()
val numRecords = 10000
val startingTimestamp = System.currentTimeMillis()
@@ -49,7 +53,9 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
@Test
def testCommitSync(): Unit = {
- val consumer = createAsyncConsumer()
+ val props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name());
+ val consumer = createConsumer(configOverrides = props)
val producer = createProducer()
val numRecords = 10000
val startingTimestamp = System.currentTimeMillis()
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 1a861b2b2f1..4eaa83f0eb6 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
-import org.apache.kafka.clients.consumer.internals.PrototypeAsyncConsumer
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -171,19 +170,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
producer
}
- def createAsyncConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
- valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
- configOverrides: Properties = new Properties,
- configsToRemove: List[String] = List()): PrototypeAsyncConsumer[K, V] = {
- val props = new Properties
- props ++= consumerConfig
- props ++= configOverrides
- configsToRemove.foreach(props.remove(_))
- val consumer = new PrototypeAsyncConsumer[K, V](props, keyDeserializer, valueDeserializer)
- consumers += consumer
- consumer
- }
-
def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
configOverrides: Properties = new Properties,
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 880806c87b2..9d2971a3ceb 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -586,4 +586,14 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>
+ <Match>
+ <!-- Suppress the warnings about the caller not using the return value from Consumer. KafkaConsumer now
+ delegates all API calls to an internal instance of Consumer, which confuses SpotBugs. -->
+ <Or>
+ <Class name="org.apache.kafka.tools.ClientCompatibilityTest"/>
+ <Class name="org.apache.kafka.tools.StreamsResetter"/>
+ </Or>
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
+ </Match>
+
</FindBugsFilter>
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index a706d74d66e..7f00f9ddd48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.integration;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -131,9 +133,14 @@ public class TaskAssignorIntegrationTest {
final Field mainConsumer = StreamThread.class.getDeclaredField("mainConsumer");
mainConsumer.setAccessible(true);
- final KafkaConsumer<?, ?> consumer = (KafkaConsumer<?, ?>) mainConsumer.get(streamThread);
+ final KafkaConsumer<?, ?> parentConsumer = (KafkaConsumer<?, ?>) mainConsumer.get(streamThread);
- final Field assignors = KafkaConsumer.class.getDeclaredField("assignors");
+ final Field delegate = KafkaConsumer.class.getDeclaredField("delegate");
+ delegate.setAccessible(true);
+ final Consumer<?, ?> consumer = (Consumer<?, ?>) delegate.get(parentConsumer);
+ assertThat(consumer, instanceOf(LegacyKafkaConsumer.class));
+
+ final Field assignors = LegacyKafkaConsumer.class.getDeclaredField("assignors");
assignors.setAccessible(true);
final List<ConsumerPartitionAssignor> consumerPartitionAssignors = (List<ConsumerPartitionAssignor>) assignors.get(consumer);
final StreamsPartitionAssignor streamsPartitionAssignor = (StreamsPartitionAssignor) consumerPartitionAssignors.get(0);