You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/12 20:57:52 UTC
[kafka] branch trunk updated: KAFKA-9821: consolidate Streams
rebalance triggering mechanisms (#8596)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58f7a97 KAFKA-9821: consolidate Streams rebalance triggering mechanisms (#8596)
58f7a97 is described below
commit 58f7a973149f66b980f99c3f2bd688d70b06b2eb
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue May 12 13:57:18 2020 -0700
KAFKA-9821: consolidate Streams rebalance triggering mechanisms (#8596)
Persist followup rebalance in assignment and consolidate rebalance triggering mechanisms
Reviewers: John Roesler <vv...@apache.org>
---
.../org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../streams/processor/internals/StreamThread.java | 26 +---
.../internals/StreamsPartitionAssignor.java | 149 ++++++++++++---------
.../internals/StreamsRebalanceListener.java | 10 +-
.../assignment/AssignorConfiguration.java | 4 +-
.../internals/assignment/AssignorError.java | 4 +-
.../integration/EosBetaUpgradeIntegrationTest.java | 2 +
...ghAvailabilityStreamsPartitionAssignorTest.java | 2 +-
.../processor/internals/StreamThreadTest.java | 2 +-
.../internals/StreamsPartitionAssignorTest.java | 50 +++++--
.../kafka/streams/tests/StreamsUpgradeTest.java | 12 +-
.../tests/streams/streams_upgrade_test.py | 12 +-
12 files changed, 166 insertions(+), 109 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5000911..2ec1d3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -870,7 +870,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
- public static final String NEXT_PROBING_REBALANCE_MS = "__next.probing.rebalance.ms__";
+ public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
public static final String TIME = "__time__";
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c9f08e4..d4079f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -242,11 +241,6 @@ public class StreamThread extends Thread {
}
}
- int getAssignmentErrorCode() {
- return assignmentErrorCode.get();
- }
-
-
private final Time time;
private final Logger log;
private final String logPrefix;
@@ -256,7 +250,6 @@ public class StreamThread extends Thread {
private final int maxPollTimeMs;
private final String originalReset;
private final TaskManager taskManager;
- private final AtomicInteger assignmentErrorCode;
private final AtomicLong nextProbingRebalanceMs;
private final StreamsMetricsImpl streamsMetrics;
@@ -366,8 +359,8 @@ public class StreamThread extends Thread {
consumerConfigs.put(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
final AtomicInteger assignmentErrorCode = new AtomicInteger();
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
- final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
- consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
+ final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
+ consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
String originalReset = null;
if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@@ -392,7 +385,7 @@ public class StreamThread extends Thread {
threadId,
logContext,
assignmentErrorCode,
- nextProbingRebalanceMs
+ nextScheduledRebalanceMs
);
return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
@@ -475,13 +468,12 @@ public class StreamThread extends Thread {
this.builder = builder;
this.logPrefix = logContext.logPrefix();
this.log = logContext.logger(StreamThread.class);
- this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log);
+ this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, assignmentErrorCode);
this.taskManager = taskManager;
this.restoreConsumer = restoreConsumer;
this.mainConsumer = mainConsumer;
this.changelogReader = changelogReader;
this.originalReset = originalReset;
- this.assignmentErrorCode = assignmentErrorCode;
this.nextProbingRebalanceMs = nextProbingRebalanceMs;
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
@@ -556,14 +548,8 @@ public class StreamThread extends Thread {
while (isRunning() || taskManager.isRebalanceInProgress()) {
try {
runOnce();
- if (assignmentErrorCode.get() == AssignorError.REBALANCE_NEEDED.code()) {
- log.info("Detected that the assignor requested a rebalance. Rejoining the consumer group to " +
- "trigger a new rebalance.");
- assignmentErrorCode.set(AssignorError.NONE.code());
- mainConsumer.enforceRebalance();
- } else if (nextProbingRebalanceMs.get() < time.milliseconds()) {
- log.info("The probing rebalance interval has elapsed since the last rebalance, triggering a " +
- "rebalance to probe for newly caught-up clients");
+ if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+ log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
mainConsumer.enforceRebalance();
}
} catch (final TaskCorruptedException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 242cff7..0525997 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -161,7 +161,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
@SuppressWarnings("deprecation")
private org.apache.kafka.streams.processor.PartitionGrouper partitionGrouper;
private AtomicInteger assignmentErrorCode;
- private AtomicLong nextProbingRebalanceMs;
+ private AtomicLong nextScheduledRebalanceMs;
private Time time;
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
@@ -192,7 +192,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskManager = assignorConfiguration.getTaskManager();
streamsMetadataState = assignorConfiguration.getStreamsMetadataState();
assignmentErrorCode = assignorConfiguration.getAssignmentErrorCode(configs);
- nextProbingRebalanceMs = assignorConfiguration.getNextProbingRebalanceMs(configs);
+ nextScheduledRebalanceMs = assignorConfiguration.getNextScheduledRebalanceMs(configs);
time = assignorConfiguration.getTime(configs);
assignmentConfigs = assignorConfiguration.getAssignmentConfigs();
partitionGrouper = assignorConfiguration.getPartitionGrouper();
@@ -864,7 +864,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final int minUserMetadataVersion,
final int minSupportedMetadataVersion,
final boolean shouldTriggerProbingRebalance) {
- // keep track of whether a 2nd rebalance is unavoidable so we can skip trying to get a completely sticky assignment
boolean rebalanceRequired = shouldTriggerProbingRebalance;
final Map<String, Assignment> assignment = new HashMap<>();
@@ -878,8 +877,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// Try to avoid triggering another rebalance by giving active tasks back to their previous owners within a
// client, without violating load balance. If we already know another rebalance will be required, or the
- // client had no owned partitions, try to balance the workload as evenly as possible by interleaving the
- // tasks among consumers and hopefully spreading the heavier subtopologies evenly across threads.
+ // client had no owned partitions, try to balance the workload as evenly as possible by interleaving tasks
if (rebalanceRequired || state.ownedPartitions().isEmpty()) {
activeTaskAssignments = interleaveConsumerTasksByGroupId(state.activeTasks(), consumers);
} else if ((activeTaskAssignments = tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, allOwnedPartitions))
@@ -894,7 +892,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance
final boolean encodeNextProbingRebalanceTime = clientId.equals(taskManager.processId()) && shouldTriggerProbingRebalance;
- addClientAssignments(
+ final boolean followupRebalanceScheduled = addClientAssignments(
assignment,
clientMetadata,
partitionsForTask,
@@ -907,6 +905,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
minSupportedMetadataVersion,
false,
encodeNextProbingRebalanceTime);
+
+ if (followupRebalanceScheduled) {
+ rebalanceRequired = true;
+ log.debug("Requested client {} to schedule a followup rebalance", clientId);
+ }
+ }
+
+ if (rebalanceRequired) {
+ log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
+ } else {
+ log.info("Finished stable assignment of tasks, no followup rebalances required.");
}
return assignment;
@@ -955,26 +964,29 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
false);
}
+ log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled due to version probing.");
+
return assignment;
}
/**
* Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
+ * @return true if this client has been told to schedule a followup rebalance
*/
- private void addClientAssignments(final Map<String, Assignment> assignment,
- final ClientMetadata clientMetadata,
- final Map<TaskId, Set<TopicPartition>> partitionsForTask,
- final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
- final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
- final Set<TopicPartition> allOwnedPartitions,
- final Map<String, List<TaskId>> activeTaskAssignments,
- final Map<String, List<TaskId>> standbyTaskAssignments,
- final int minUserMetadataVersion,
- final int minSupportedMetadataVersion,
- final boolean versionProbing,
- final boolean probingRebalanceNeeded) {
- boolean encodeNextRebalanceTime = probingRebalanceNeeded;
- boolean stableAssignment = !probingRebalanceNeeded && !versionProbing;
+ private boolean addClientAssignments(final Map<String, Assignment> assignment,
+ final ClientMetadata clientMetadata,
+ final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+ final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+ final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
+ final Set<TopicPartition> allOwnedPartitions,
+ final Map<String, List<TaskId>> activeTaskAssignments,
+ final Map<String, List<TaskId>> standbyTaskAssignments,
+ final int minUserMetadataVersion,
+ final int minSupportedMetadataVersion,
+ final boolean versionProbing,
+ final boolean probingRebalanceNeeded) {
+ boolean rebalanceRequested = probingRebalanceNeeded || versionProbing;
+ boolean shouldEncodeProbingRebalance = probingRebalanceNeeded;
// Loop through the consumers and build their assignment
for (final String consumer : clientMetadata.consumers) {
@@ -984,17 +996,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final List<TopicPartition> activePartitionsList = new ArrayList<>();
final List<TaskId> assignedActiveList = new ArrayList<>();
- if (populateActiveTaskAndPartitionsLists(
- activePartitionsList,
- assignedActiveList,
- consumer,
- clientMetadata.state,
- activeTasksForConsumer,
- partitionsForTask,
- allOwnedPartitions)
- ) {
- stableAssignment = false;
- }
+ final boolean tasksRevoked = populateActiveTaskAndPartitionsLists(
+ activePartitionsList,
+ assignedActiveList,
+ consumer,
+ clientMetadata.state,
+ activeTasksForConsumer,
+ partitionsForTask,
+ allOwnedPartitions);
final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
buildStandbyTaskMap(standbyTaskAssignments.get(consumer), partitionsForTask);
@@ -1009,14 +1018,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
AssignorError.NONE.code()
);
- if (encodeNextRebalanceTime) {
+ if (tasksRevoked) {
+ // TODO: once KAFKA-9821 is resolved we can leave it to the client to trigger this rebalance
+ log.debug("Requesting followup rebalance be scheduled immediately due to tasks changing ownership.");
+ info.setNextRebalanceTime(0L);
+ rebalanceRequested = true;
+ } else if (shouldEncodeProbingRebalance) {
final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();
+ log.debug("Requesting followup rebalance be scheduled for {} ms to probe for caught-up replica tasks.", nextRebalanceTimeMs);
info.setNextRebalanceTime(nextRebalanceTimeMs);
- log.info("Scheduled a followup probing rebalance for {} ms.", nextRebalanceTimeMs);
- encodeNextRebalanceTime = false;
+ shouldEncodeProbingRebalance = false;
}
- // finally, encode the assignment and insert into map with all assignments
assignment.put(
consumer,
new Assignment(
@@ -1025,12 +1038,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
)
);
}
-
- if (stableAssignment) {
- log.info("Finished stable assignment of tasks, no followup rebalances required.");
- } else {
- log.info("Finished unstable assignment of tasks, a followup probing rebalance will be triggered.");
- }
+ return rebalanceRequested;
}
/**
@@ -1057,7 +1065,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final boolean newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer);
// If the partition is new to this consumer but is still owned by another, remove from the assignment
- // until it has been revoked and can safely be reassigned according the COOPERATIVE protocol
+ // until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol
if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
log.info("Removing task {} from assignment until it is safely revoked in followup rebalance", taskId);
clientState.removeFromAssignment(taskId);
@@ -1338,7 +1346,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
if (info.errCode() != AssignorError.NONE.code()) {
// set flag to shutdown streams app
- setAssignmentErrorCode(info.errCode());
+ assignmentErrorCode.set(info.errCode());
return;
}
/*
@@ -1356,18 +1364,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
- // Check if this was a version probing rebalance and check the error code to trigger another rebalance if so
- if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
- log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance.");
- setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
- }
-
// version 1 field
final Map<TaskId, Set<TopicPartition>> activeTasks;
// version 2 fields
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo;
final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;
+ final long encodedNextScheduledRebalanceMs;
switch (receivedAssignmentMetadataVersion) {
case 1:
@@ -1377,6 +1380,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = Collections.emptyMap();
standbyPartitionsByHost = Collections.emptyMap();
topicToPartitionInfo = Collections.emptyMap();
+ encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
case 2:
case 3:
@@ -1388,6 +1392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = Collections.emptyMap();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+ encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
case 6:
validateActiveTaskEncoding(partitions, info, logPrefix);
@@ -1396,6 +1401,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = info.standbyPartitionByHost();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+ encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
case 7:
validateActiveTaskEncoding(partitions, info, logPrefix);
@@ -1404,7 +1410,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = info.partitionsByHost();
standbyPartitionsByHost = info.standbyPartitionByHost();
topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
- nextProbingRebalanceMs.set(info.nextRebalanceMs());
+ encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
break;
default:
throw new IllegalStateException(
@@ -1413,7 +1419,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
);
}
- verifyHostInfo(partitionsByHost.keySet());
+ maybeScheduleFollowupRebalance(
+ encodedNextScheduledRebalanceMs,
+ receivedAssignmentMetadataVersion,
+ latestCommonlySupportedVersion,
+ partitionsByHost.keySet()
+ );
final Cluster fakeCluster = Cluster.empty().withPartitions(topicToPartitionInfo);
streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fakeCluster);
@@ -1423,6 +1434,28 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskManager.handleAssignment(activeTasks, info.standbyTasks());
}
+ private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebalanceMs,
+ final int receivedAssignmentMetadataVersion,
+ final int latestCommonlySupportedVersion,
+ final Set<HostInfo> groupHostInfo) {
+ if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
+ log.info("Requested to schedule immediate rebalance due to version probing.");
+ nextScheduledRebalanceMs.set(0L);
+ } else if (!verifyHostInfo(groupHostInfo)) {
+ log.info("Requested to schedule immediate rebalance to update group with new host endpoint = {}.", userEndPoint);
+ nextScheduledRebalanceMs.set(0L);
+ } else if (encodedNextScheduledRebalanceMs == 0L) {
+ log.info("Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner.");
+ nextScheduledRebalanceMs.set(0L);
+ } else if (encodedNextScheduledRebalanceMs < Long.MAX_VALUE) {
+ log.info("Requested to schedule probing rebalance for {} ms.", encodedNextScheduledRebalanceMs);
+ nextScheduledRebalanceMs.set(encodedNextScheduledRebalanceMs);
+ } else {
+ log.info("No followup rebalance was requested, resetting the rebalance schedule.");
+ nextScheduledRebalanceMs.set(Long.MAX_VALUE);
+ }
+ }
+
/**
* Verify that this client's host info was included in the map returned in the assignment, and trigger a
* rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed
@@ -1430,15 +1463,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
* to force a rebalance for the other members in the group to get the updated host info for this client.
*
* @param groupHostInfo the HostInfo of all clients in the group
+ * @return false if the current host info does not match that in the group assignment
*/
- private void verifyHostInfo(final Set<HostInfo> groupHostInfo) {
+ private boolean verifyHostInfo(final Set<HostInfo> groupHostInfo) {
if (userEndPoint != null && !groupHostInfo.isEmpty()) {
final HostInfo myHostInfo = HostInfo.buildFromEndpoint(userEndPoint);
- if (!groupHostInfo.contains(myHostInfo)) {
- log.info("Triggering a rebalance to update group with new endpoint = {}", userEndPoint);
- setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
- }
+ return groupHostInfo.contains(myHostInfo);
+ } else {
+ return true;
}
}
@@ -1543,10 +1576,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
}
- protected void setAssignmentErrorCode(final Integer errorCode) {
- assignmentErrorCode.set(errorCode);
- }
-
// following functions are for test only
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index 2c85eaa..b594aa6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
@@ -31,23 +32,26 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
private final TaskManager taskManager;
private final StreamThread streamThread;
private final Logger log;
+ private final AtomicInteger assignmentErrorCode;
StreamsRebalanceListener(final Time time,
final TaskManager taskManager,
final StreamThread streamThread,
- final Logger log) {
+ final Logger log,
+ final AtomicInteger assignmentErrorCode) {
this.time = time;
this.taskManager = taskManager;
this.streamThread = streamThread;
this.log = log;
+ this.assignmentErrorCode = assignmentErrorCode;
}
@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
// NB: all task management is already handled by:
// org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
- if (streamThread.getAssignmentErrorCode() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
- log.error("Received error code {} - shutdown", streamThread.getAssignmentErrorCode());
+ if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
+ log.error("Received error code {} - shutdown", assignmentErrorCode.get());
streamThread.shutdown();
} else {
streamThread.setState(State.PARTITIONS_ASSIGNED);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 8ea63fc..4d77804 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -190,8 +190,8 @@ public final class AssignorConfiguration {
return (AtomicInteger) ai;
}
- public AtomicLong getNextProbingRebalanceMs(final Map<String, ?> configs) {
- final Object al = configs.get(InternalConfig.NEXT_PROBING_REBALANCE_MS);
+ public AtomicLong getNextScheduledRebalanceMs(final Map<String, ?> configs) {
+ final Object al = configs.get(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
if (al == null) {
final KafkaException fatalException = new KafkaException("nextProbingRebalanceMs is not specified");
log.error(fatalException.getMessage(), fatalException);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
index 259c3db..6234a7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
@@ -18,9 +18,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
public enum AssignorError {
NONE(0),
- INCOMPLETE_SOURCE_TOPIC_METADATA(1),
- REBALANCE_NEEDED(2);
-
+ INCOMPLETE_SOURCE_TOPIC_METADATA(1);
private final int code;
AssignorError(final int code) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index eff4991..b4b9dce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -185,6 +186,7 @@ public class EosBetaUpgradeIntegrationTest {
}
@Test
+ @Ignore
public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
// We use two KafkaStreams clients that we upgrade from eos-alpha to eos-beta. During the upgrade,
// we ensure that there are pending transaction and verify that data is processed correctly.
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index 2c27d11..06f866f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -123,7 +123,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
- configurationMap.put(InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
+ configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextProbingRebalanceMs);
configurationMap.put(InternalConfig.TIME, time);
configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName());
return configurationMap;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 0a47553..5542067 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -541,7 +541,7 @@ public class StreamThreadTest {
}
AtomicLong nextRebalanceMs() {
- return (AtomicLong) consumerConfigs.get(StreamsConfig.InternalConfig.NEXT_PROBING_REBALANCE_MS);
+ return (AtomicLong) consumerConfigs.get(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 840d48e..4c64824 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -47,7 +47,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
@@ -86,6 +85,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -200,7 +200,7 @@ public class StreamsPartitionAssignorTest {
private final Class<? extends TaskAssignor> taskAssignor;
private final AtomicInteger assignmentError = new AtomicInteger();
- private final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
+ private final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
private final MockTime time = new MockTime();
private Map<String, Object> configProps() {
@@ -211,7 +211,7 @@ public class StreamsPartitionAssignorTest {
configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
- configurationMap.put(InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
+ configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
configurationMap.put(InternalConfig.TIME, time);
configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
return configurationMap;
@@ -1360,7 +1360,7 @@ public class StreamsPartitionAssignorTest {
}
@Test
- public void shouldTriggerRebalanceOnHostInfoChange() {
+ public void shouldTriggerImmediateRebalanceOnHostInfoChange() {
final Map<HostInfo, Set<TopicPartition>> oldHostState = mkMap(
mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)),
mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1))
@@ -1376,14 +1376,44 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.onAssignment(createAssignment(oldHostState), null);
- assertThat(assignmentError.get(), is(AssignorError.REBALANCE_NEEDED.code()));
+ assertThat(nextScheduledRebalanceMs.get(), is(0L));
- partitionAssignor.setAssignmentErrorCode(AssignorError.NONE.code());
partitionAssignor.onAssignment(createAssignment(newHostState), null);
- assertThat(assignmentError.get(), is(AssignorError.NONE.code()));
+ assertThat(nextScheduledRebalanceMs.get(), is(Long.MAX_VALUE));
+ }
- EasyMock.verify(taskManager);
+ @Test
+ public void shouldTriggerImmediateRebalanceOnTasksRevoked() {
+ builder.addSource(null, "source1", null, null, null, "topic1");
+
+ final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+
+ subscriptions.put(CONSUMER_1,
+ new Subscription(
+ Collections.singletonList("topic1"),
+ getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(),
+ asList(t1p0, t1p1, t1p2))
+ );
+ subscriptions.put(CONSUMER_2,
+ new Subscription(
+ Collections.singletonList("topic1"),
+ getInfo(UUID_1, EMPTY_TASKS, allTasks).encode(),
+ emptyList())
+ );
+
+ createMockTaskManager(allTasks, allTasks);
+ configurePartitionAssignorWith(singletonMap(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0L));
+
+ final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+ // Verify at least one partition was revoked
+ assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
+ assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList()));
+
+ partitionAssignor.onAssignment(assignment.get(CONSUMER_2), null);
+
+ assertThat(nextScheduledRebalanceMs.get(), is(0L));
}
@Test
@@ -1768,13 +1798,13 @@ public class StreamsPartitionAssignorTest {
@Test
public void shouldGetNextProbingRebalanceMs() {
- nextProbingRebalanceMs.set(5 * 60 * 1000L);
+ nextScheduledRebalanceMs.set(5 * 60 * 1000L);
createDefaultMockTaskManager();
final Map<String, Object> props = configProps();
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
- assertThat(assignorConfiguration.getNextProbingRebalanceMs(props).get(), equalTo(5 * 60 * 1000L));
+ assertThat(assignorConfiguration.getNextScheduledRebalanceMs(props).get(), equalTo(5 * 60 * 1000L));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 78903c0..cb8787f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.LegacySubscriptionInfoSerde;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -57,6 +59,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
@@ -116,8 +119,10 @@ public class StreamsUpgradeTest {
}
public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor {
+ private final Logger log = LoggerFactory.getLogger(FutureStreamsPartitionAssignor.class);
private AtomicInteger usedSubscriptionMetadataVersionPeek;
+ private AtomicLong nextScheduledRebalanceMs;
public FutureStreamsPartitionAssignor() {
usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION + 1;
@@ -133,6 +138,8 @@ public class StreamsUpgradeTest {
usedSubscriptionMetadataVersionPeek = new AtomicInteger();
}
configs.remove("test.future.metadata");
+ nextScheduledRebalanceMs = new AssignorConfiguration(configs).getNextScheduledRebalanceMs(configs);
+
super.configure(configs);
}
@@ -194,7 +201,8 @@ public class StreamsUpgradeTest {
assignment.userData().putInt(0, LATEST_SUPPORTED_VERSION));
if (maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) {
- setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+ log.info("Requested to schedule immediate rebalance due to version probing.");
+ nextScheduledRebalanceMs.set(0L);
usedSubscriptionMetadataVersionPeek.set(usedSubscriptionMetadataVersion);
}
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 7e1b27c..0f0719a 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -528,22 +528,22 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
- log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
+ log_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
timeout_sec=60,
- err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
+ err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
else:
first_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(first_other_node.account))
- first_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
+ first_other_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
timeout_sec=60,
- err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
+ err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
second_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(second_other_node.account))
- second_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
+ second_other_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
timeout_sec=60,
- err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
+ err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
# version probing should trigger second rebalance
# now we check that after consecutive rebalances we have synchronized generation