You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/17 17:58:57 UTC
[kafka] branch trunk updated: KAFKA-7853: Refactor coordinator
config (#6854)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1b9e107 KAFKA-7853: Refactor coordinator config (#6854)
1b9e107 is described below
commit 1b9e1073885951697f34950a1ea706c93826e871
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Jun 17 10:58:43 2019 -0700
KAFKA-7853: Refactor coordinator config (#6854)
An attempt to refactor current coordinator logic.
Reviewers: Stanislav Kozlovski <st...@outlook.com>, Konstantine Karantasis <ko...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../apache/kafka/clients/CommonClientConfigs.java | 38 +++++++-
.../apache/kafka/clients/GroupRebalanceConfig.java | 100 +++++++++++++++++++
.../kafka/clients/consumer/ConsumerConfig.java | 37 ++-----
.../kafka/clients/consumer/KafkaConsumer.java | 29 ++----
.../consumer/internals/AbstractCoordinator.java | 106 ++++++++-------------
.../consumer/internals/ConsumerCoordinator.java | 51 ++++------
.../clients/consumer/internals/Heartbeat.java | 37 +++----
.../kafka/clients/CommonClientConfigsTest.java | 2 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 42 ++++----
.../internals/AbstractCoordinatorTest.java | 34 +++++--
.../internals/ConsumerCoordinatorTest.java | 96 ++++++++++++-------
.../clients/consumer/internals/HeartbeatTest.java | 21 +++-
.../runtime/distributed/DistributedConfig.java | 13 +--
.../runtime/distributed/WorkerCoordinator.java | 22 ++---
.../runtime/distributed/WorkerGroupMember.java | 9 +-
.../WorkerCoordinatorIncrementalTest.java | 37 +++----
.../runtime/distributed/WorkerCoordinatorTest.java | 38 ++++----
17 files changed, 409 insertions(+), 303 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 49465dc..d8428a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Some configurations shared by both producer and consumer
+ * Configurations shared by Kafka client applications: producer, consumer, connect, etc.
*/
public class CommonClientConfigs {
private static final Logger log = LoggerFactory.getLogger(CommonClientConfigs.class);
@@ -101,6 +101,42 @@ public class CommonClientConfigs {
+ "elapses the client will resend the request if necessary or fail the request if "
+ "retries are exhausted.";
+ public static final String GROUP_ID_CONFIG = "group.id";
+ public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
+
+ public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
+ public static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " +
+ "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " +
+ "which means that only one instance with this ID is allowed in the consumer group at any time. " +
+ "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " +
+ "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.";
+
+ public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
+ public static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " +
+ "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " +
+ "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " +
+ "is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
+
+ public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms";
+ public static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " +
+ "once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " +
+ "flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " +
+ "from the group, which will cause offset commit failures.";
+
+ public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+ public static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect client failures when using " +
+ "Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness " +
+ "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " +
+ "then the broker will remove this client from the group and initiate a rebalance. Note that the value " +
+ "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
+ "and <code>group.max.session.timeout.ms</code>.";
+
+ public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+ public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " +
+ "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
+ "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " +
+ "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " +
+ "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
diff --git a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
new file mode 100644
index 0000000..006800a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * Class to extract group rebalance related configs.
+ */
+public class GroupRebalanceConfig {
+
+ public enum ProtocolType {
+ CONSUMER,
+ CONNECT;
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+ }
+
+ public final int sessionTimeoutMs;
+ public final int rebalanceTimeoutMs;
+ public final int heartbeatIntervalMs;
+ public final String groupId;
+ public final Optional<String> groupInstanceId;
+ public final long retryBackoffMs;
+ public final boolean leaveGroupOnClose;
+
+ public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) {
+ this.sessionTimeoutMs = config.getInt(CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG);
+
+ // Consumer and Connect use different config names for defining rebalance timeout
+ if (protocolType == ProtocolType.CONSUMER) {
+ this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+ } else {
+ this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG);
+ }
+
+ this.heartbeatIntervalMs = config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.groupId = config.getString(CommonClientConfigs.GROUP_ID_CONFIG);
+
+ // Static membership is only introduced in consumer API.
+ if (protocolType == ProtocolType.CONSUMER) {
+ String groupInstanceId = config.getString(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG);
+ if (groupInstanceId != null) {
+ JoinGroupRequest.validateGroupInstanceId(groupInstanceId);
+ this.groupInstanceId = Optional.of(groupInstanceId);
+ } else {
+ this.groupInstanceId = Optional.empty();
+ }
+ } else {
+ this.groupInstanceId = Optional.empty();
+ }
+
+ this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+
+ // Internal leave group config is only defined in Consumer.
+ if (protocolType == ProtocolType.CONSUMER) {
+ this.leaveGroupOnClose = config.getBoolean("internal.leave.group.on.close");
+ } else {
+ this.leaveGroupOnClose = true;
+ }
+ }
+
+ // For testing purpose.
+ public GroupRebalanceConfig(final int sessionTimeoutMs,
+ final int rebalanceTimeoutMs,
+ final int heartbeatIntervalMs,
+ String groupId,
+ Optional<String> groupInstanceId,
+ long retryBackoffMs,
+ boolean leaveGroupOnClose) {
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ this.groupId = groupId;
+ this.groupInstanceId = groupInstanceId;
+ this.retryBackoffMs = retryBackoffMs;
+ this.leaveGroupOnClose = leaveGroupOnClose;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 010fff8..7eb34d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -50,50 +50,33 @@ public class ConsumerConfig extends AbstractConfig {
/**
* <code>group.id</code>
*/
- public static final String GROUP_ID_CONFIG = "group.id";
- private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
+ public static final String GROUP_ID_CONFIG = CommonClientConfigs.GROUP_ID_CONFIG;
+ private static final String GROUP_ID_DOC = CommonClientConfigs.GROUP_ID_DOC;
/**
* <code>group.instance.id</code>
*/
- public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
- private static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by end user. " +
- "Only non-empty strings are permitted. If set, the consumer is treated as a static member, " +
- "which means that only one instance with this ID is allowed in the consumer group at any time. " +
- "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability " +
- "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.";
+ public static final String GROUP_INSTANCE_ID_CONFIG = CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG;
+ private static final String GROUP_INSTANCE_ID_DOC = CommonClientConfigs.GROUP_INSTANCE_ID_DOC;
/** <code>max.poll.records</code> */
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
/** <code>max.poll.interval.ms</code> */
- public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
- private static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll() when using " +
- "consumer group management. This places an upper bound on the amount of time that the consumer can be idle " +
- "before fetching more records. If poll() is not called before expiration of this timeout, then the consumer " +
- "is considered failed and the group will rebalance in order to reassign the partitions to another member. ";
-
+ public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG;
+ private static final String MAX_POLL_INTERVAL_MS_DOC = CommonClientConfigs.MAX_POLL_INTERVAL_MS_DOC;
/**
* <code>session.timeout.ms</code>
*/
- public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
- private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect consumer failures when using " +
- "Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness " +
- "to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, " +
- "then the broker will remove this consumer from the group and initiate a rebalance. Note that the value " +
- "must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> " +
- "and <code>group.max.session.timeout.ms</code>.";
+ public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
+ private static final String SESSION_TIMEOUT_MS_DOC = CommonClientConfigs.SESSION_TIMEOUT_MS_DOC;
/**
* <code>heartbeat.interval.ms</code>
*/
- public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
- private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer " +
- "coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
- "consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. " +
- "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher " +
- "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+ public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
+ private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
/**
* <code>bootstrap.servers</code>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5be065d..79afafa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -27,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
-import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@@ -50,7 +50,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.IsolationLevel;
-import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
@@ -568,7 +567,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Logger log;
private final String clientId;
private String groupId;
- private Optional<String> groupInstanceId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
@@ -674,15 +672,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.clientId = clientId;
this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
+ GroupRebalanceConfig.ProtocolType.CONSUMER);
LogContext logContext;
// If group.instance.id is set, we will append it to the log context.
- String groupInstanceId = config.getString(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
- if (groupInstanceId != null) {
- JoinGroupRequest.validateGroupInstanceId(groupInstanceId);
- this.groupInstanceId = Optional.of(groupInstanceId);
- logContext = new LogContext("[Consumer instanceId=" + groupInstanceId + ", clientId=" + clientId + ", groupId=" + groupId + "] ");
+ if (groupRebalanceConfig.groupInstanceId.isPresent()) {
+ logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
+ ", clientId=" + clientId + ", groupId=" + groupId + "] ");
} else {
- this.groupInstanceId = Optional.empty();
logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
}
@@ -773,28 +770,20 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
- int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
- int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
// no coordinator will be constructed for the default (null) group id
this.coordinator = groupId == null ? null :
- new ConsumerCoordinator(logContext,
+ new ConsumerCoordinator(groupRebalanceConfig,
+ logContext,
this.client,
- groupId,
- this.groupInstanceId,
- maxPollIntervalMs,
- sessionTimeoutMs,
- new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
- retryBackoffMs,
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
- this.interceptors,
- config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
+ this.interceptors);
this.fetcher = new Fetcher<>(
logContext,
this.client,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 30277b3..efe6cb7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -71,7 +72,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -114,15 +114,11 @@ public abstract class AbstractCoordinator implements Closeable {
}
private final Logger log;
- private final int sessionTimeoutMs;
private final GroupCoordinatorMetrics sensors;
private final Heartbeat heartbeat;
- protected final int rebalanceTimeoutMs;
- protected final String groupId;
- protected final Optional<String> groupInstanceId;
+ private final GroupRebalanceConfig rebalanceConfig;
protected final ConsumerNetworkClient client;
protected final Time time;
- protected final long retryBackoffMs;
private HeartbeatThread heartbeatThread = null;
private boolean rejoinNeeded = true;
@@ -133,52 +129,24 @@ public abstract class AbstractCoordinator implements Closeable {
private Generation generation = Generation.NO_GENERATION;
private RequestFuture<Void> findCoordinatorFuture = null;
- private final boolean leaveGroupOnClose;
/**
* Initialize the coordination manager.
*/
- public AbstractCoordinator(LogContext logContext,
+ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
+ LogContext logContext,
ConsumerNetworkClient client,
- String groupId,
- Optional<String> groupInstanceId,
- int rebalanceTimeoutMs,
- int sessionTimeoutMs,
- Heartbeat heartbeat,
Metrics metrics,
String metricGrpPrefix,
- Time time,
- long retryBackoffMs,
- boolean leaveGroupOnClose) {
+ Time time) {
+ Objects.requireNonNull(rebalanceConfig.groupId,
+ "Expected a non-null group id for coordinator construction");
+ this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(AbstractCoordinator.class);
this.client = client;
this.time = time;
- this.groupId = Objects.requireNonNull(groupId,
- "Expected a non-null group id for coordinator construction");
- this.groupInstanceId = groupInstanceId;
- this.rebalanceTimeoutMs = rebalanceTimeoutMs;
- this.sessionTimeoutMs = sessionTimeoutMs;
- this.heartbeat = heartbeat;
+ this.heartbeat = new Heartbeat(rebalanceConfig, time);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
- this.retryBackoffMs = retryBackoffMs;
- this.leaveGroupOnClose = leaveGroupOnClose;
- }
-
- public AbstractCoordinator(LogContext logContext,
- ConsumerNetworkClient client,
- String groupId,
- Optional<String> groupInstanceId,
- int rebalanceTimeoutMs,
- int sessionTimeoutMs,
- int heartbeatIntervalMs,
- Metrics metrics,
- String metricGrpPrefix,
- Time time,
- long retryBackoffMs,
- boolean leaveGroupOnClose) {
- this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
- new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
- metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
}
/**
@@ -263,7 +231,7 @@ public abstract class AbstractCoordinator implements Closeable {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
- timer.sleep(retryBackoffMs);
+ timer.sleep(rebalanceConfig.retryBackoffMs);
}
} while (coordinatorUnknown() && timer.notExpired());
@@ -438,7 +406,7 @@ public abstract class AbstractCoordinator implements Closeable {
else if (!future.isRetriable())
throw exception;
- timer.sleep(retryBackoffMs);
+ timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
@@ -505,13 +473,13 @@ public abstract class AbstractCoordinator implements Closeable {
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
- .setGroupId(groupId)
- .setSessionTimeoutMs(this.sessionTimeoutMs)
+ .setGroupId(rebalanceConfig.groupId)
+ .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
- .setGroupInstanceId(this.groupInstanceId.orElse(null))
+ .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
- .setRebalanceTimeoutMs(this.rebalanceTimeoutMs)
+ .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
@@ -519,7 +487,7 @@ public abstract class AbstractCoordinator implements Closeable {
// Note that we override the request timeout using the rebalance timeout since that is the
// maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
- int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
+ int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler());
}
@@ -573,9 +541,9 @@ public abstract class AbstractCoordinator implements Closeable {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
- future.raise(new GroupMaxSizeReachedException(groupId));
+ future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
+ future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
future.raise(error);
}
@@ -606,9 +574,9 @@ public abstract class AbstractCoordinator implements Closeable {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
- .setGroupId(groupId)
+ .setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
- .setGroupInstanceId(this.groupInstanceId.orElse(null))
+ .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList())
);
@@ -633,9 +601,9 @@ public abstract class AbstractCoordinator implements Closeable {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
- .setGroupId(groupId)
+ .setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
- .setGroupInstanceId(this.groupInstanceId.orElse(null))
+ .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList)
);
@@ -665,7 +633,7 @@ public abstract class AbstractCoordinator implements Closeable {
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
+ future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("SyncGroup failed because the group began another rebalance");
future.raise(error);
@@ -701,7 +669,7 @@ public abstract class AbstractCoordinator implements Closeable {
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
- .setKey(this.groupId));
+ .setKey(this.rebalanceConfig.groupId));
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
@@ -731,7 +699,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
+ future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
@@ -854,7 +822,7 @@ public abstract class AbstractCoordinator implements Closeable {
// Synchronize after closing the heartbeat thread since heartbeat thread
// needs this lock to complete and terminate after close flag is set.
synchronized (this) {
- if (leaveGroupOnClose) {
+ if (rebalanceConfig.leaveGroupOnClose) {
maybeLeaveGroup();
}
@@ -883,7 +851,7 @@ public abstract class AbstractCoordinator implements Closeable {
// attempt any resending if the request fails or times out.
log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
- .setGroupId(groupId).setMemberId(generation.memberId));
+ .setGroupId(rebalanceConfig.groupId).setMemberId(generation.memberId));
client.send(coordinator, request)
.compose(new LeaveGroupResponseHandler());
client.pollNoWakeup();
@@ -893,7 +861,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
protected boolean isDynamicMember() {
- return !groupInstanceId.isPresent();
+ return !rebalanceConfig.groupInstanceId.isPresent();
}
private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
@@ -915,9 +883,9 @@ public abstract class AbstractCoordinator implements Closeable {
log.debug("Sending Heartbeat request to coordinator {}", coordinator);
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(new HeartbeatRequestData()
- .setGroupId(groupId)
+ .setGroupId(rebalanceConfig.groupId)
.setMemberId(this.generation.memberId)
- .setGroupInstanceId(this.groupInstanceId.orElse(null))
+ .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler());
@@ -953,7 +921,7 @@ public abstract class AbstractCoordinator implements Closeable {
resetGeneration();
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
+ future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
@@ -1051,7 +1019,7 @@ public abstract class AbstractCoordinator implements Closeable {
private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
private HeartbeatThread() {
- super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + groupId), true);
+ super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true);
}
public void enable() {
@@ -1113,7 +1081,7 @@ public abstract class AbstractCoordinator implements Closeable {
if (findCoordinatorFuture != null || lookupCoordinator().failed())
// the immediate future check ensures that we backoff properly in the case that no
// brokers are available to connect to.
- AbstractCoordinator.this.wait(retryBackoffMs);
+ AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
@@ -1131,7 +1099,7 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
- AbstractCoordinator.this.wait(retryBackoffMs);
+ AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
@@ -1153,7 +1121,7 @@ public abstract class AbstractCoordinator implements Closeable {
// however, then the session timeout may expire before we can rejoin.
heartbeat.receiveHeartbeat();
} else if (e instanceof FencedInstanceIdException) {
- log.error("Caught fenced group.instance.id {} error in heartbeat thread", groupInstanceId);
+ log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
heartbeatThread.failed.set(e);
heartbeatThread.disable();
} else {
@@ -1243,4 +1211,8 @@ public abstract class AbstractCoordinator implements Closeable {
}
+ // For testing only
+ public Heartbeat heartbeat() {
+ return heartbeat;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index fbf5db4..a590a1e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -63,7 +64,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +74,7 @@ import java.util.stream.Collectors;
* This class manages the coordination process with the consumer coordinator.
*/
public final class ConsumerCoordinator extends AbstractCoordinator {
+ private final GroupRebalanceConfig rebalanceConfig;
private final Logger log;
private final List<PartitionAssignor> assignors;
private final ConsumerMetadata metadata;
@@ -120,36 +121,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
/**
* Initialize the coordination manager.
*/
- public ConsumerCoordinator(LogContext logContext,
+ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
+ LogContext logContext,
ConsumerNetworkClient client,
- String groupId,
- Optional<String> groupInstanceId,
- int rebalanceTimeoutMs,
- int sessionTimeoutMs,
- Heartbeat heartbeat,
List<PartitionAssignor> assignors,
ConsumerMetadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Time time,
- long retryBackoffMs,
boolean autoCommitEnabled,
int autoCommitIntervalMs,
- ConsumerInterceptors<?, ?> interceptors,
- boolean leaveGroupOnClose) {
- super(logContext,
+ ConsumerInterceptors<?, ?> interceptors) {
+ super(rebalanceConfig,
+ logContext,
client,
- groupId,
- groupInstanceId,
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeat,
metrics,
metricGrpPrefix,
- time,
- retryBackoffMs,
- leaveGroupOnClose);
+ time);
+ this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
@@ -459,7 +449,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override
protected void onJoinPrepare(int generation, String memberId) {
// commit offsets prior to rebalance if auto-commit enabled
- maybeAutoCommitOffsetsSync(time.timer(rebalanceTimeoutMs));
+ maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
// execute the user's callback before rebalance
ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
@@ -558,7 +548,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
} else if (!future.isRetriable()) {
throw future.exception();
} else {
- timer.sleep(retryBackoffMs);
+ timer.sleep(rebalanceConfig.retryBackoffMs);
}
} else {
return null;
@@ -585,8 +575,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// visible for testing
void invokeCompletedOffsetCommitCallbacks() {
if (asyncCommitFenced.get()) {
- throw new FencedInstanceIdException("Get fenced exception for group.instance.id: " +
- groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId());
+ throw new FencedInstanceIdException("Get fenced exception for group.instance.id "
+ + rebalanceConfig.groupInstanceId.orElse("unset_instance_id")
+ + ", current member.id is " + memberId());
}
while (true) {
OffsetCommitCompletion completion = completedOffsetCommits.poll();
@@ -698,7 +689,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (future.failed() && !future.isRetriable())
throw future.exception();
- timer.sleep(retryBackoffMs);
+ timer.sleep(rebalanceConfig.retryBackoffMs);
} while (timer.notExpired());
return false;
@@ -723,7 +714,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (exception instanceof RetriableException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
- nextAutoCommitTimer.updateAndReset(retryBackoffMs);
+ nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
@@ -813,10 +804,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
- .setGroupId(this.groupId)
+ .setGroupId(this.rebalanceConfig.groupId)
.setGenerationId(generation.generationId)
.setMemberId(generation.memberId)
- .setGroupInstanceId(groupInstanceId.orElse(null))
+ .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
);
@@ -857,7 +848,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
+ future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(tp.topic());
@@ -929,7 +920,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.debug("Fetching committed offsets for partitions: {}", partitions);
// construct the request
- OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.groupId,
+ OffsetFetchRequest.Builder requestBuilder = new OffsetFetchRequest.Builder(this.rebalanceConfig.groupId,
new ArrayList<>(partitions));
// send the request with a callback
@@ -952,7 +943,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
markCoordinatorUnknown();
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
+ future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index 8a67f31..3bf8c92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -23,10 +24,8 @@ import org.apache.kafka.common.utils.Timer;
* A helper class for managing the heartbeat to the coordinator
*/
public final class Heartbeat {
- private final int sessionTimeoutMs;
- private final int heartbeatIntervalMs;
private final int maxPollIntervalMs;
- private final long retryBackoffMs;
+ private final GroupRebalanceConfig rebalanceConfig;
private final Time time;
private final Timer heartbeatTimer;
private final Timer sessionTimer;
@@ -34,21 +33,15 @@ public final class Heartbeat {
private volatile long lastHeartbeatSend;
- public Heartbeat(Time time,
- int sessionTimeoutMs,
- int heartbeatIntervalMs,
- int maxPollIntervalMs,
- long retryBackoffMs) {
- if (heartbeatIntervalMs >= sessionTimeoutMs)
+ public Heartbeat(GroupRebalanceConfig config,
+ Time time) {
+ if (config.heartbeatIntervalMs >= config.sessionTimeoutMs)
throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
-
+ this.rebalanceConfig = config;
this.time = time;
- this.sessionTimeoutMs = sessionTimeoutMs;
- this.heartbeatIntervalMs = heartbeatIntervalMs;
- this.maxPollIntervalMs = maxPollIntervalMs;
- this.retryBackoffMs = retryBackoffMs;
- this.heartbeatTimer = time.timer(heartbeatIntervalMs);
- this.sessionTimer = time.timer(sessionTimeoutMs);
+ this.heartbeatTimer = time.timer(config.heartbeatIntervalMs);
+ this.sessionTimer = time.timer(config.sessionTimeoutMs);
+ this.maxPollIntervalMs = config.rebalanceTimeoutMs;
this.pollTimer = time.timer(maxPollIntervalMs);
}
@@ -66,17 +59,17 @@ public final class Heartbeat {
public void sentHeartbeat(long now) {
this.lastHeartbeatSend = now;
update(now);
- heartbeatTimer.reset(heartbeatIntervalMs);
+ heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
}
public void failHeartbeat() {
update(time.milliseconds());
- heartbeatTimer.reset(retryBackoffMs);
+ heartbeatTimer.reset(rebalanceConfig.retryBackoffMs);
}
public void receiveHeartbeat() {
update(time.milliseconds());
- sessionTimer.reset(sessionTimeoutMs);
+ sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
}
public boolean shouldHeartbeat(long now) {
@@ -100,14 +93,14 @@ public final class Heartbeat {
public void resetTimeouts() {
update(time.milliseconds());
- sessionTimer.reset(sessionTimeoutMs);
+ sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
pollTimer.reset(maxPollIntervalMs);
- heartbeatTimer.reset(heartbeatIntervalMs);
+ heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
}
public void resetSessionTimeout() {
update(time.milliseconds());
- sessionTimer.reset(sessionTimeoutMs);
+ sessionTimer.reset(rebalanceConfig.sessionTimeoutMs);
}
public boolean pollTimeoutExpired(long now) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
index 63a9312..d65e6d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java
@@ -58,7 +58,7 @@ public class CommonClientConfigsTest {
}
@Test
- public void testExponentialBackoffDefaults() throws Exception {
+ public void testExponentialBackoffDefaults() {
TestConfig defaultConf = new TestConfig(Collections.emptyMap());
assertEquals(Long.valueOf(50L),
defaultConf.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 405ec68..c1adf19 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -27,7 +28,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
@@ -1895,27 +1895,25 @@ public class KafkaConsumerTest {
ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
- Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
- ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
- loggerFactory,
- consumerClient,
- groupId,
- groupInstanceId,
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeat,
- assignors,
- metadata,
- subscription,
- metrics,
- metricGroupPrefix,
- time,
- retryBackoffMs,
- autoCommitEnabled,
- autoCommitIntervalMs,
- interceptors,
- true);
-
+ GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+ rebalanceTimeoutMs,
+ heartbeatIntervalMs,
+ groupId,
+ groupInstanceId,
+ retryBackoffMs,
+ true);
+ ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(rebalanceConfig,
+ loggerFactory,
+ consumerClient,
+ assignors,
+ metadata,
+ subscription,
+ metrics,
+ metricGroupPrefix,
+ time,
+ autoCommitEnabled,
+ autoCommitIntervalMs,
+ interceptors);
Fetcher<String, String> fetcher = new Fetcher<>(
loggerFactory,
consumerClient,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 0fc5f62..659ef5f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
@@ -104,14 +105,30 @@ public class AbstractCoordinatorTest {
logContext, new ClusterResourceListeners());
this.mockClient = new MockClient(mockTime, metadata);
- this.consumerClient = new ConsumerNetworkClient(logContext, mockClient, metadata, mockTime,
- retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS);
+ this.consumerClient = new ConsumerNetworkClient(logContext,
+ mockClient,
+ metadata,
+ mockTime,
+ retryBackoffMs,
+ REQUEST_TIMEOUT_MS,
+ HEARTBEAT_INTERVAL_MS);
Metrics metrics = new Metrics();
mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
this.node = metadata.fetch().nodes().get(0);
this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
- this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs, groupInstanceId);
+
+ GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(SESSION_TIMEOUT_MS,
+ rebalanceTimeoutMs,
+ HEARTBEAT_INTERVAL_MS,
+ GROUP_ID,
+ groupInstanceId,
+ retryBackoffMs,
+ !groupInstanceId.isPresent());
+ this.coordinator = new DummyCoordinator(rebalanceConfig,
+ consumerClient,
+ metrics,
+ mockTime);
}
@Test
@@ -850,14 +867,11 @@ public class AbstractCoordinatorTest {
private int onJoinCompleteInvokes = 0;
private boolean wakeupOnJoinComplete = false;
- public DummyCoordinator(ConsumerNetworkClient client,
+ public DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
+ ConsumerNetworkClient client,
Metrics metrics,
- Time time,
- int rebalanceTimeoutMs,
- int retryBackoffMs,
- Optional<String> groupInstanceId) {
- super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
- SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
+ Time time) {
+ super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 4b377ea..c54540e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -118,8 +119,7 @@ public class ConsumerCoordinatorTest {
private final int autoCommitIntervalMs = 2000;
private final int requestTimeoutMs = 30000;
private final MockTime time = new MockTime();
- private final Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs,
- rebalanceTimeoutMs, retryBackoffMs);
+ private GroupRebalanceConfig rebalanceConfig;
private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor);
@@ -153,8 +153,21 @@ public class ConsumerCoordinatorTest {
this.rebalanceListener = new MockRebalanceListener();
this.mockOffsetCommitCallback = new MockCommitCallback();
this.partitionAssignor.clear();
+ this.rebalanceConfig = buildRebalanceConfig(Optional.empty());
+ this.coordinator = buildCoordinator(rebalanceConfig,
+ metrics,
+ assignors,
+ false);
+ }
- this.coordinator = buildCoordinator(metrics, assignors, false, Optional.empty());
+ private GroupRebalanceConfig buildRebalanceConfig(Optional<String> groupInstanceId) {
+ return new GroupRebalanceConfig(sessionTimeoutMs,
+ rebalanceTimeoutMs,
+ heartbeatIntervalMs,
+ groupId,
+ groupInstanceId,
+ retryBackoffMs,
+ !groupInstanceId.isPresent());
}
@After
@@ -741,10 +754,10 @@ public class ConsumerCoordinatorTest {
assertTrue(coordinator.coordinatorUnknown());
assertFalse(coordinator.poll(time.timer(0)));
- assertEquals(time.milliseconds(), heartbeat.lastPollTime());
+ assertEquals(time.milliseconds(), coordinator.heartbeat().lastPollTime());
time.sleep(rebalanceTimeoutMs - 1);
- assertFalse(heartbeat.pollTimeoutExpired(time.milliseconds()));
+ assertFalse(coordinator.heartbeat().pollTimeoutExpired(time.milliseconds()));
}
@Test
@@ -1044,9 +1057,6 @@ public class ConsumerCoordinatorTest {
@Test
public void testWakeupFromAssignmentCallback() {
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- false, Optional.empty());
-
final String topic = "topic1";
TopicPartition partition = new TopicPartition(topic, 0);
final String consumerId = "follower";
@@ -1162,7 +1172,10 @@ public class ConsumerCoordinatorTest {
metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics,
false, subscriptions, new LogContext(), new ClusterResourceListeners());
client = new MockClient(time, metadata);
- coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty());
+ coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ false);
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
@@ -1290,8 +1303,10 @@ public class ConsumerCoordinatorTest {
public void testAutoCommitDynamicAssignment() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ true);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1306,8 +1321,10 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitRetryBackoff() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ true);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1340,7 +1357,10 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitAwaitsInterval() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, true, groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ true);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
@@ -1378,8 +1398,10 @@ public class ConsumerCoordinatorTest {
public void testAutoCommitDynamicAssignmentRebalance() {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ true);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -1404,8 +1426,10 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignment() {
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ true);
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
@@ -1421,8 +1445,10 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ true);
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
@@ -2066,8 +2092,10 @@ public class ConsumerCoordinatorTest {
@Test
public void testAutoCommitAfterCoordinatorBackToService() {
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- true, groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ true);
subscriptions.assignFromUser(Collections.singleton(t1p));
subscriptions.seek(t1p, 100L);
@@ -2125,8 +2153,11 @@ public class ConsumerCoordinatorTest {
final boolean autoCommit,
final Optional<String> groupInstanceId) {
final String consumerId = "consumer";
- ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
- autoCommit, groupInstanceId);
+ rebalanceConfig = buildRebalanceConfig(groupInstanceId);
+ ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+ new Metrics(),
+ assignors,
+ autoCommit);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
if (useGroupManagement) {
@@ -2216,30 +2247,23 @@ public class ConsumerCoordinatorTest {
assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get());
}
- private ConsumerCoordinator buildCoordinator(final Metrics metrics,
+ private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanceConfig,
+ final Metrics metrics,
final List<PartitionAssignor> assignors,
- final boolean autoCommitEnabled,
- final Optional<String> groupInstanceId) {
+ final boolean autoCommitEnabled) {
return new ConsumerCoordinator(
+ rebalanceConfig,
new LogContext(),
consumerClient,
- groupId,
- groupInstanceId,
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeat,
assignors,
metadata,
subscriptions,
metrics,
"consumer" + groupId,
time,
- retryBackoffMs,
autoCommitEnabled,
autoCommitIntervalMs,
- null,
- !groupInstanceId.isPresent()
- );
+ null);
}
private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index c382de6..b014bec 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -16,23 +16,38 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.utils.MockTime;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Optional;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class HeartbeatTest {
-
private int sessionTimeoutMs = 300;
private int heartbeatIntervalMs = 100;
private int maxPollIntervalMs = 900;
private long retryBackoffMs = 10L;
private MockTime time = new MockTime();
- private Heartbeat heartbeat = new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs,
- maxPollIntervalMs, retryBackoffMs);
+
+ private Heartbeat heartbeat;
+
+ @Before
+ public void setUp() {
+ GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+ maxPollIntervalMs,
+ heartbeatIntervalMs,
+ "group_id",
+ Optional.empty(),
+ retryBackoffMs,
+ true);
+ heartbeat = new Heartbeat(rebalanceConfig, time);
+ }
@Test
public void testShouldHeartbeat() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index c7ff6d8..bc3bc19 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -39,13 +39,13 @@ public class DistributedConfig extends WorkerConfig {
/**
* <code>group.id</code>
*/
- public static final String GROUP_ID_CONFIG = "group.id";
+ public static final String GROUP_ID_CONFIG = CommonClientConfigs.GROUP_ID_CONFIG;
private static final String GROUP_ID_DOC = "A unique string that identifies the Connect cluster group this worker belongs to.";
/**
* <code>session.timeout.ms</code>
*/
- public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+ public static final String SESSION_TIMEOUT_MS_CONFIG = CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG;
private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect worker failures. " +
"The worker sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are " +
"received by the broker before the expiration of this session timeout, then the broker will remove the " +
@@ -56,7 +56,7 @@ public class DistributedConfig extends WorkerConfig {
/**
* <code>heartbeat.interval.ms</code>
*/
- public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+ public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group " +
"coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the " +
"worker's session stays active and to facilitate rebalancing when new members join or leave the group. " +
@@ -66,11 +66,8 @@ public class DistributedConfig extends WorkerConfig {
/**
* <code>rebalance.timeout.ms</code>
*/
- public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms";
- private static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group " +
- "once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to " +
- "flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed " +
- "from the group, which will cause offset commit failures.";
+ public static final String REBALANCE_TIMEOUT_MS_CONFIG = CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG;
+ private static final String REBALANCE_TIMEOUT_MS_DOC = CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC;
/**
* <code>worker.sync.timeout.ms</code>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 230a272..0b855a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -39,7 +40,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
@@ -70,33 +70,23 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
/**
* Initialize the coordination manager.
*/
- public WorkerCoordinator(LogContext logContext,
+ public WorkerCoordinator(GroupRebalanceConfig config,
+ LogContext logContext,
ConsumerNetworkClient client,
- String groupId,
- int rebalanceTimeoutMs,
- int sessionTimeoutMs,
- int heartbeatIntervalMs,
Metrics metrics,
String metricGrpPrefix,
Time time,
- long retryBackoffMs,
String restUrl,
ConfigBackingStore configStorage,
WorkerRebalanceListener listener,
ConnectProtocolCompatibility protocolCompatibility,
int maxDelay) {
- super(logContext,
+ super(config,
+ logContext,
client,
- groupId,
- Optional.empty(),
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeatIntervalMs,
metrics,
metricGrpPrefix,
- time,
- retryBackoffMs,
- true);
+ time);
this.log = logContext.logger(WorkerCoordinator.class);
this.restUrl = restUrl;
this.configStorage = configStorage;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 99ea3a4..94cf97d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
@@ -79,8 +80,6 @@ public class WorkerGroupMember {
this.clientId = clientId;
this.log = logContext.logger(WorkerGroupMember.class);
- String groupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
-
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
@@ -124,16 +123,12 @@ public class WorkerGroupMember {
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
Integer.MAX_VALUE);
this.coordinator = new WorkerCoordinator(
+ new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONNECT),
logContext,
this.client,
- groupId,
- config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG),
- config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
- config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
metrics,
metricGrpPrefix,
this.time,
- retryBackoffMs,
restUrl,
configStorage,
listener,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index f06976a..b1d3ec6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.distributed;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -45,6 +46,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -93,6 +95,7 @@ public class WorkerCoordinatorIncrementalTest {
private MockRebalanceListener rebalanceListener;
@Mock
private KafkaConfigBackingStore configStorage;
+ private GroupRebalanceConfig rebalanceConfig;
private WorkerCoordinator coordinator;
private int rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
@@ -150,22 +153,24 @@ public class WorkerCoordinatorIncrementalTest {
this.configStorageCalls = 0;
- this.coordinator = new WorkerCoordinator(
- loggerFactory,
- consumerClient,
- groupId,
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeatIntervalMs,
- metrics,
- "worker" + groupId,
- time,
- retryBackoffMs,
- expectedUrl(leaderId),
- configStorage,
- rebalanceListener,
- compatibility,
- rebalanceDelay);
+ this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+ rebalanceTimeoutMs,
+ heartbeatIntervalMs,
+ groupId,
+ Optional.empty(),
+ retryBackoffMs,
+ true);
+ this.coordinator = new WorkerCoordinator(rebalanceConfig,
+ loggerFactory,
+ consumerClient,
+ metrics,
+ "worker" + groupId,
+ time,
+ expectedUrl(leaderId),
+ configStorage,
+ rebalanceListener,
+ compatibility,
+ rebalanceDelay);
configState1 = clusterConfigState(offset, 2, 4);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 182d6bd..eac13d1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.distributed;
+import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -56,6 +57,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -90,6 +92,7 @@ public class WorkerCoordinatorTest {
private ConsumerNetworkClient consumerClient;
private MockRebalanceListener rebalanceListener;
@Mock private KafkaConfigBackingStore configStorage;
+ private GroupRebalanceConfig rebalanceConfig;
private WorkerCoordinator coordinator;
private ClusterConfigState configState1;
@@ -125,23 +128,24 @@ public class WorkerCoordinatorTest {
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
-
- this.coordinator = new WorkerCoordinator(
- logContext,
- consumerClient,
- groupId,
- rebalanceTimeoutMs,
- sessionTimeoutMs,
- heartbeatIntervalMs,
- metrics,
- "consumer" + groupId,
- time,
- retryBackoffMs,
- LEADER_URL,
- configStorage,
- rebalanceListener,
- compatibility,
- 0);
+ this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+ rebalanceTimeoutMs,
+ heartbeatIntervalMs,
+ groupId,
+ Optional.empty(),
+ retryBackoffMs,
+ true);
+ this.coordinator = new WorkerCoordinator(rebalanceConfig,
+ logContext,
+ consumerClient,
+ metrics,
+ "consumer" + groupId,
+ time,
+ LEADER_URL,
+ configStorage,
+ rebalanceListener,
+ compatibility,
+ 0);
configState1 = new ClusterConfigState(
1L,