You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/12 20:57:52 UTC

[kafka] branch trunk updated: KAFKA-9821: consolidate Streams rebalance triggering mechanisms (#8596)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 58f7a97  KAFKA-9821: consolidate Streams rebalance triggering mechanisms (#8596)
58f7a97 is described below

commit 58f7a973149f66b980f99c3f2bd688d70b06b2eb
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue May 12 13:57:18 2020 -0700

    KAFKA-9821: consolidate Streams rebalance triggering mechanisms (#8596)
    
    Persist followup rebalance in assignment and consolidate rebalance triggering mechanisms
    
    Reviewers: John Roesler <vv...@apache.org>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |   2 +-
 .../streams/processor/internals/StreamThread.java  |  26 +---
 .../internals/StreamsPartitionAssignor.java        | 149 ++++++++++++---------
 .../internals/StreamsRebalanceListener.java        |  10 +-
 .../assignment/AssignorConfiguration.java          |   4 +-
 .../internals/assignment/AssignorError.java        |   4 +-
 .../integration/EosBetaUpgradeIntegrationTest.java |   2 +
 ...ghAvailabilityStreamsPartitionAssignorTest.java |   2 +-
 .../processor/internals/StreamThreadTest.java      |   2 +-
 .../internals/StreamsPartitionAssignorTest.java    |  50 +++++--
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  12 +-
 .../tests/streams/streams_upgrade_test.py          |  12 +-
 12 files changed, 166 insertions(+), 109 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5000911..2ec1d3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -870,7 +870,7 @@ public class StreamsConfig extends AbstractConfig {
         public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
         public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
         public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
-        public static final String NEXT_PROBING_REBALANCE_MS = "__next.probing.rebalance.ms__";
+        public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
         public static final String TIME = "__time__";
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c9f08e4..d4079f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
 import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@@ -242,11 +241,6 @@ public class StreamThread extends Thread {
         }
     }
 
-    int getAssignmentErrorCode() {
-        return assignmentErrorCode.get();
-    }
-
-
     private final Time time;
     private final Logger log;
     private final String logPrefix;
@@ -256,7 +250,6 @@ public class StreamThread extends Thread {
     private final int maxPollTimeMs;
     private final String originalReset;
     private final TaskManager taskManager;
-    private final AtomicInteger assignmentErrorCode;
     private final AtomicLong nextProbingRebalanceMs;
 
     private final StreamsMetricsImpl streamsMetrics;
@@ -366,8 +359,8 @@ public class StreamThread extends Thread {
         consumerConfigs.put(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
         final AtomicInteger assignmentErrorCode = new AtomicInteger();
         consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
-        final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
-        consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
+        final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
+        consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
         String originalReset = null;
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
             originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@@ -392,7 +385,7 @@ public class StreamThread extends Thread {
             threadId,
             logContext,
             assignmentErrorCode,
-            nextProbingRebalanceMs
+            nextScheduledRebalanceMs
         );
 
         return streamThread.updateThreadMetadata(getSharedAdminClientId(clientId));
@@ -475,13 +468,12 @@ public class StreamThread extends Thread {
         this.builder = builder;
         this.logPrefix = logContext.logPrefix();
         this.log = logContext.logger(StreamThread.class);
-        this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log);
+        this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, assignmentErrorCode);
         this.taskManager = taskManager;
         this.restoreConsumer = restoreConsumer;
         this.mainConsumer = mainConsumer;
         this.changelogReader = changelogReader;
         this.originalReset = originalReset;
-        this.assignmentErrorCode = assignmentErrorCode;
         this.nextProbingRebalanceMs = nextProbingRebalanceMs;
 
         this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
@@ -556,14 +548,8 @@ public class StreamThread extends Thread {
         while (isRunning() || taskManager.isRebalanceInProgress()) {
             try {
                 runOnce();
-                if (assignmentErrorCode.get() == AssignorError.REBALANCE_NEEDED.code()) {
-                    log.info("Detected that the assignor requested a rebalance. Rejoining the consumer group to " +
-                                 "trigger a new rebalance.");
-                    assignmentErrorCode.set(AssignorError.NONE.code());
-                    mainConsumer.enforceRebalance();
-                } else if (nextProbingRebalanceMs.get() < time.milliseconds()) {
-                    log.info("The probing rebalance interval has elapsed since the last rebalance, triggering a " +
-                                "rebalance to probe for newly caught-up clients");
+                if (nextProbingRebalanceMs.get() < time.milliseconds()) {
+                    log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
                     mainConsumer.enforceRebalance();
                 }
             } catch (final TaskCorruptedException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 242cff7..0525997 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -161,7 +161,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
     @SuppressWarnings("deprecation")
     private org.apache.kafka.streams.processor.PartitionGrouper partitionGrouper;
     private AtomicInteger assignmentErrorCode;
-    private AtomicLong nextProbingRebalanceMs;
+    private AtomicLong nextScheduledRebalanceMs;
     private Time time;
 
     protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
@@ -192,7 +192,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         taskManager = assignorConfiguration.getTaskManager();
         streamsMetadataState = assignorConfiguration.getStreamsMetadataState();
         assignmentErrorCode = assignorConfiguration.getAssignmentErrorCode(configs);
-        nextProbingRebalanceMs = assignorConfiguration.getNextProbingRebalanceMs(configs);
+        nextScheduledRebalanceMs = assignorConfiguration.getNextScheduledRebalanceMs(configs);
         time = assignorConfiguration.getTime(configs);
         assignmentConfigs = assignorConfiguration.getAssignmentConfigs();
         partitionGrouper = assignorConfiguration.getPartitionGrouper();
@@ -864,7 +864,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                                                          final int minUserMetadataVersion,
                                                          final int minSupportedMetadataVersion,
                                                          final boolean shouldTriggerProbingRebalance) {
-        // keep track of whether a 2nd rebalance is unavoidable so we can skip trying to get a completely sticky assignment
         boolean rebalanceRequired = shouldTriggerProbingRebalance;
         final Map<String, Assignment> assignment = new HashMap<>();
 
@@ -878,8 +877,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
 
             // Try to avoid triggering another rebalance by giving active tasks back to their previous owners within a
             // client, without violating load balance. If we already know another rebalance will be required, or the
-            // client had no owned partitions, try to balance the workload as evenly as possible by interleaving the
-            // tasks among consumers and hopefully spreading the heavier subtopologies evenly across threads.
+            // client had no owned partitions, try to balance the workload as evenly as possible by interleaving tasks
             if (rebalanceRequired || state.ownedPartitions().isEmpty()) {
                 activeTaskAssignments = interleaveConsumerTasksByGroupId(state.activeTasks(), consumers);
             } else if ((activeTaskAssignments = tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, allOwnedPartitions))
@@ -894,7 +892,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance
             final boolean encodeNextProbingRebalanceTime = clientId.equals(taskManager.processId()) && shouldTriggerProbingRebalance;
 
-            addClientAssignments(
+            final boolean followupRebalanceScheduled = addClientAssignments(
                 assignment,
                 clientMetadata,
                 partitionsForTask,
@@ -907,6 +905,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 minSupportedMetadataVersion,
                 false,
                 encodeNextProbingRebalanceTime);
+
+            if (followupRebalanceScheduled) {
+                rebalanceRequired = true;
+                log.debug("Requested client {} to schedule a followup rebalance", clientId);
+            }
+        }
+
+        if (rebalanceRequired) {
+            log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled.");
+        } else {
+            log.info("Finished stable assignment of tasks, no followup rebalances required.");
         }
 
         return assignment;
@@ -955,26 +964,29 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 false);
         }
 
+        log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled due to version probing.");
+
         return assignment;
     }
 
     /**
      * Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map
+     * @return true if this client has been told to schedule a followup rebalance
      */
-    private void addClientAssignments(final Map<String, Assignment> assignment,
-                                      final ClientMetadata clientMetadata,
-                                      final Map<TaskId, Set<TopicPartition>> partitionsForTask,
-                                      final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
-                                      final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
-                                      final Set<TopicPartition> allOwnedPartitions,
-                                      final Map<String, List<TaskId>> activeTaskAssignments,
-                                      final Map<String, List<TaskId>> standbyTaskAssignments,
-                                      final int minUserMetadataVersion,
-                                      final int minSupportedMetadataVersion,
-                                      final boolean versionProbing,
-                                      final boolean probingRebalanceNeeded) {
-        boolean encodeNextRebalanceTime = probingRebalanceNeeded;
-        boolean stableAssignment = !probingRebalanceNeeded && !versionProbing;
+    private boolean addClientAssignments(final Map<String, Assignment> assignment,
+                                         final ClientMetadata clientMetadata,
+                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+                                         final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
+                                         final Set<TopicPartition> allOwnedPartitions,
+                                         final Map<String, List<TaskId>> activeTaskAssignments,
+                                         final Map<String, List<TaskId>> standbyTaskAssignments,
+                                         final int minUserMetadataVersion,
+                                         final int minSupportedMetadataVersion,
+                                         final boolean versionProbing,
+                                         final boolean probingRebalanceNeeded) {
+        boolean rebalanceRequested = probingRebalanceNeeded || versionProbing;
+        boolean shouldEncodeProbingRebalance = probingRebalanceNeeded;
 
         // Loop through the consumers and build their assignment
         for (final String consumer : clientMetadata.consumers) {
@@ -984,17 +996,14 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             final List<TopicPartition> activePartitionsList = new ArrayList<>();
             final List<TaskId> assignedActiveList = new ArrayList<>();
 
-            if (populateActiveTaskAndPartitionsLists(
-                    activePartitionsList,
-                    assignedActiveList,
-                    consumer,
-                    clientMetadata.state,
-                    activeTasksForConsumer,
-                    partitionsForTask,
-                    allOwnedPartitions)
-            ) {
-                stableAssignment = false;
-            }
+            final boolean tasksRevoked = populateActiveTaskAndPartitionsLists(
+                activePartitionsList,
+                assignedActiveList,
+                consumer,
+                clientMetadata.state,
+                activeTasksForConsumer,
+                partitionsForTask,
+                allOwnedPartitions);
 
             final Map<TaskId, Set<TopicPartition>> standbyTaskMap =
                 buildStandbyTaskMap(standbyTaskAssignments.get(consumer), partitionsForTask);
@@ -1009,14 +1018,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 AssignorError.NONE.code()
             );
 
-            if (encodeNextRebalanceTime) {
+            if (tasksRevoked) {
+                // TODO: once KAFKA-9821 is resolved we can leave it to the client to trigger this rebalance
+                log.debug("Requesting followup rebalance be scheduled immediately due to tasks changing ownership.");
+                info.setNextRebalanceTime(0L);
+                rebalanceRequested = true;
+            } else if (shouldEncodeProbingRebalance) {
                 final long nextRebalanceTimeMs = time.milliseconds() + probingRebalanceIntervalMs();
+                log.debug("Requesting followup rebalance be scheduled for {} ms to probe for caught-up replica tasks.", nextRebalanceTimeMs);
                 info.setNextRebalanceTime(nextRebalanceTimeMs);
-                log.info("Scheduled a followup probing rebalance for {} ms.", nextRebalanceTimeMs);
-                encodeNextRebalanceTime = false;
+                shouldEncodeProbingRebalance = false;
             }
 
-            // finally, encode the assignment and insert into map with all assignments
             assignment.put(
                 consumer,
                 new Assignment(
@@ -1025,12 +1038,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 )
             );
         }
-
-        if (stableAssignment) {
-            log.info("Finished stable assignment of tasks, no followup rebalances required.");
-        } else {
-            log.info("Finished unstable assignment of tasks, a followup probing rebalance will be triggered.");
-        }
+        return rebalanceRequested;
     }
 
     /**
@@ -1057,7 +1065,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 final boolean newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer);
 
                 // If the partition is new to this consumer but is still owned by another, remove from the assignment
-                // until it has been revoked and can safely be reassigned according the COOPERATIVE protocol
+                // until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol
                 if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
                     log.info("Removing task {} from assignment until it is safely revoked in followup rebalance", taskId);
                     clientState.removeFromAssignment(taskId);
@@ -1338,7 +1346,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
         if (info.errCode() != AssignorError.NONE.code()) {
             // set flag to shutdown streams app
-            setAssignmentErrorCode(info.errCode());
+            assignmentErrorCode.set(info.errCode());
             return;
         }
         /*
@@ -1356,18 +1364,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
 
         validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
 
-        // Check if this was a version probing rebalance and check the error code to trigger another rebalance if so
-        if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
-            log.info("Version probing detected. Rejoining the consumer group to trigger a new rebalance.");
-            setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-        }
-
         // version 1 field
         final Map<TaskId, Set<TopicPartition>> activeTasks;
         // version 2 fields
         final Map<TopicPartition, PartitionInfo> topicToPartitionInfo;
         final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
         final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost;
+        final long encodedNextScheduledRebalanceMs;
 
         switch (receivedAssignmentMetadataVersion) {
             case 1:
@@ -1377,6 +1380,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 partitionsByHost = Collections.emptyMap();
                 standbyPartitionsByHost = Collections.emptyMap();
                 topicToPartitionInfo = Collections.emptyMap();
+                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                 break;
             case 2:
             case 3:
@@ -1388,6 +1392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 partitionsByHost = info.partitionsByHost();
                 standbyPartitionsByHost = Collections.emptyMap();
                 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                 break;
             case 6:
                 validateActiveTaskEncoding(partitions, info, logPrefix);
@@ -1396,6 +1401,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 partitionsByHost = info.partitionsByHost();
                 standbyPartitionsByHost = info.standbyPartitionByHost();
                 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+                encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                 break;
             case 7:
                 validateActiveTaskEncoding(partitions, info, logPrefix);
@@ -1404,7 +1410,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 partitionsByHost = info.partitionsByHost();
                 standbyPartitionsByHost = info.standbyPartitionByHost();
                 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
-                nextProbingRebalanceMs.set(info.nextRebalanceMs());
+                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                 break;
             default:
                 throw new IllegalStateException(
@@ -1413,7 +1419,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 );
         }
 
-        verifyHostInfo(partitionsByHost.keySet());
+        maybeScheduleFollowupRebalance(
+            encodedNextScheduledRebalanceMs,
+            receivedAssignmentMetadataVersion,
+            latestCommonlySupportedVersion,
+            partitionsByHost.keySet()
+        );
 
         final Cluster fakeCluster = Cluster.empty().withPartitions(topicToPartitionInfo);
         streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fakeCluster);
@@ -1423,6 +1434,28 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         taskManager.handleAssignment(activeTasks, info.standbyTasks());
     }
 
+    private void maybeScheduleFollowupRebalance(final long encodedNextScheduledRebalanceMs,
+                                                final int receivedAssignmentMetadataVersion,
+                                                final int latestCommonlySupportedVersion,
+                                                final Set<HostInfo> groupHostInfo) {
+        if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
+            log.info("Requested to schedule immediate rebalance due to version probing.");
+            nextScheduledRebalanceMs.set(0L);
+        } else if (!verifyHostInfo(groupHostInfo)) {
+            log.info("Requested to schedule immediate rebalance to update group with new host endpoint = {}.", userEndPoint);
+            nextScheduledRebalanceMs.set(0L);
+        } else if (encodedNextScheduledRebalanceMs == 0L) {
+            log.info("Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner.");
+            nextScheduledRebalanceMs.set(0L);
+        } else if (encodedNextScheduledRebalanceMs < Long.MAX_VALUE) {
+            log.info("Requested to schedule probing rebalance for {} ms.", encodedNextScheduledRebalanceMs);
+            nextScheduledRebalanceMs.set(encodedNextScheduledRebalanceMs);
+        } else {
+            log.info("No followup rebalance was requested, resetting the rebalance schedule.");
+            nextScheduledRebalanceMs.set(Long.MAX_VALUE);
+        }
+    }
+
     /**
      * Verify that this client's host info was included in the map returned in the assignment, and trigger a
      * rebalance if not. This may be necessary when using static membership, as a rejoining client will be handed
@@ -1430,15 +1463,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
      * to force a rebalance for the other members in the group to get the updated host info for this client.
      *
      * @param groupHostInfo the HostInfo of all clients in the group
+     * @return false if the current host info does not match that in the group assignment
      */
-    private void verifyHostInfo(final Set<HostInfo> groupHostInfo) {
+    private boolean verifyHostInfo(final Set<HostInfo> groupHostInfo) {
         if (userEndPoint != null && !groupHostInfo.isEmpty()) {
             final HostInfo myHostInfo = HostInfo.buildFromEndpoint(userEndPoint);
 
-            if (!groupHostInfo.contains(myHostInfo)) {
-                log.info("Triggering a rebalance to update group with new endpoint = {}", userEndPoint);
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-            }
+            return groupHostInfo.contains(myHostInfo);
+        } else {
+            return true;
         }
     }
 
@@ -1543,10 +1576,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         }
     }
 
-    protected void setAssignmentErrorCode(final Integer errorCode) {
-        assignmentErrorCode.set(errorCode);
-    }
-
     // following functions are for test only
     void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index 2c85eaa..b594aa6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
@@ -31,23 +32,26 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
     private final TaskManager taskManager;
     private final StreamThread streamThread;
     private final Logger log;
+    private final AtomicInteger assignmentErrorCode;
 
     StreamsRebalanceListener(final Time time,
                              final TaskManager taskManager,
                              final StreamThread streamThread,
-                             final Logger log) {
+                             final Logger log,
+                             final AtomicInteger assignmentErrorCode) {
         this.time = time;
         this.taskManager = taskManager;
         this.streamThread = streamThread;
         this.log = log;
+        this.assignmentErrorCode = assignmentErrorCode;
     }
 
     @Override
     public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
-        if (streamThread.getAssignmentErrorCode() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {} - shutdown", streamThread.getAssignmentErrorCode());
+        if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
+            log.error("Received error code {} - shutdown", assignmentErrorCode.get());
             streamThread.shutdown();
         } else {
             streamThread.setState(State.PARTITIONS_ASSIGNED);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 8ea63fc..4d77804 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -190,8 +190,8 @@ public final class AssignorConfiguration {
         return (AtomicInteger) ai;
     }
 
-    public AtomicLong getNextProbingRebalanceMs(final Map<String, ?> configs) {
-        final Object al = configs.get(InternalConfig.NEXT_PROBING_REBALANCE_MS);
+    public AtomicLong getNextScheduledRebalanceMs(final Map<String, ?> configs) {
+        final Object al = configs.get(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
         if (al == null) {
             final KafkaException fatalException = new KafkaException("nextProbingRebalanceMs is not specified");
             log.error(fatalException.getMessage(), fatalException);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
index 259c3db..6234a7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
@@ -18,9 +18,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
 
 public enum AssignorError {
     NONE(0),
-    INCOMPLETE_SOURCE_TOPIC_METADATA(1),
-    REBALANCE_NEEDED(2);
-
+    INCOMPLETE_SOURCE_TOPIC_METADATA(1);
     private final int code;
 
     AssignorError(final int code) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
index eff4991..b4b9dce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
@@ -52,6 +52,7 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -185,6 +186,7 @@ public class EosBetaUpgradeIntegrationTest {
     }
 
     @Test
+    @Ignore
     public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
         // We use two KafkaStreams clients that we upgrade from eos-alpha to eos-beta. During the upgrade,
         // we ensure that there are pending transaction and verify that data is processed correctly.
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index 2c27d11..06f866f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -123,7 +123,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
         configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
         configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
         configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
-        configurationMap.put(InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
+        configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextProbingRebalanceMs);
         configurationMap.put(InternalConfig.TIME, time);
         configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName());
         return configurationMap;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 0a47553..5542067 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -541,7 +541,7 @@ public class StreamThreadTest {
         }
 
         AtomicLong nextRebalanceMs() {
-            return (AtomicLong) consumerConfigs.get(StreamsConfig.InternalConfig.NEXT_PROBING_REBALANCE_MS);
+            return (AtomicLong) consumerConfigs.get(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 840d48e..4c64824 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -47,7 +47,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
@@ -86,6 +85,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -200,7 +200,7 @@ public class StreamsPartitionAssignorTest {
     private final Class<? extends TaskAssignor> taskAssignor;
 
     private final AtomicInteger assignmentError = new AtomicInteger();
-    private final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
+    private final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
     private final MockTime time = new MockTime();
 
     private Map<String, Object> configProps() {
@@ -211,7 +211,7 @@ public class StreamsPartitionAssignorTest {
         configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
         configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
         configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
-        configurationMap.put(InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
+        configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
         configurationMap.put(InternalConfig.TIME, time);
         configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
         return configurationMap;
@@ -1360,7 +1360,7 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldTriggerRebalanceOnHostInfoChange() {
+    public void shouldTriggerImmediateRebalanceOnHostInfoChange() {
         final Map<HostInfo, Set<TopicPartition>> oldHostState = mkMap(
             mkEntry(new HostInfo("localhost", 9090), mkSet(t1p0, t1p1)),
             mkEntry(new HostInfo("otherhost", 9090), mkSet(t2p0, t2p1))
@@ -1376,14 +1376,44 @@ public class StreamsPartitionAssignorTest {
 
         partitionAssignor.onAssignment(createAssignment(oldHostState), null);
 
-        assertThat(assignmentError.get(), is(AssignorError.REBALANCE_NEEDED.code()));
+        assertThat(nextScheduledRebalanceMs.get(), is(0L));
 
-        partitionAssignor.setAssignmentErrorCode(AssignorError.NONE.code());
         partitionAssignor.onAssignment(createAssignment(newHostState), null);
 
-        assertThat(assignmentError.get(), is(AssignorError.NONE.code()));
+        assertThat(nextScheduledRebalanceMs.get(), is(Long.MAX_VALUE));
+    }
 
-        EasyMock.verify(taskManager);
+    @Test
+    public void shouldTriggerImmediateRebalanceOnTasksRevoked() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+
+        final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+
+        subscriptions.put(CONSUMER_1,
+                          new Subscription(
+                              Collections.singletonList("topic1"),
+                              getInfo(UUID_1, allTasks, EMPTY_TASKS).encode(),
+                              asList(t1p0, t1p1, t1p2))
+        );
+        subscriptions.put(CONSUMER_2,
+                          new Subscription(
+                              Collections.singletonList("topic1"),
+                              getInfo(UUID_1, EMPTY_TASKS, allTasks).encode(),
+                              emptyList())
+        );
+
+        createMockTaskManager(allTasks, allTasks);
+        configurePartitionAssignorWith(singletonMap(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0L));
+
+        final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Verify at least one partition was revoked
+        assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
+        assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList()));
+
+        partitionAssignor.onAssignment(assignment.get(CONSUMER_2), null);
+
+        assertThat(nextScheduledRebalanceMs.get(), is(0L));
     }
 
     @Test
@@ -1768,13 +1798,13 @@ public class StreamsPartitionAssignorTest {
 
     @Test
     public void shouldGetNextProbingRebalanceMs() {
-        nextProbingRebalanceMs.set(5 * 60 * 1000L);
+        nextScheduledRebalanceMs.set(5 * 60 * 1000L);
 
         createDefaultMockTaskManager();
         final Map<String, Object> props = configProps();
         final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
 
-        assertThat(assignorConfiguration.getNextProbingRebalanceMs(props).get(), equalTo(5 * 60 * 1000L));
+        assertThat(assignorConfiguration.getNextScheduledRebalanceMs(props).get(), equalTo(5 * 60 * 1000L));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 78903c0..cb8787f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.TaskManager;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
 import org.apache.kafka.streams.processor.internals.assignment.LegacySubscriptionInfoSerde;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -57,6 +59,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
@@ -116,8 +119,10 @@ public class StreamsUpgradeTest {
     }
 
     public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor {
+        private final Logger log = LoggerFactory.getLogger(FutureStreamsPartitionAssignor.class);
 
         private AtomicInteger usedSubscriptionMetadataVersionPeek;
+        private AtomicLong nextScheduledRebalanceMs;
 
         public FutureStreamsPartitionAssignor() {
             usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION + 1;
@@ -133,6 +138,8 @@ public class StreamsUpgradeTest {
                 usedSubscriptionMetadataVersionPeek = new AtomicInteger();
             }
             configs.remove("test.future.metadata");
+            nextScheduledRebalanceMs = new AssignorConfiguration(configs).getNextScheduledRebalanceMs(configs);
+
             super.configure(configs);
         }
 
@@ -194,7 +201,8 @@ public class StreamsUpgradeTest {
                 assignment.userData().putInt(0, LATEST_SUPPORTED_VERSION));
 
             if (maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) {
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+                log.info("Requested to schedule immediate rebalance due to version probing.");
+                nextScheduledRebalanceMs.set(0L);
                 usedSubscriptionMetadataVersionPeek.set(usedSubscriptionMetadataVersion);
             }
 
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 7e1b27c..0f0719a 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -528,22 +528,22 @@ class StreamsUpgradeTest(Test):
                         log_monitor.wait_until("Sent a version 8 subscription and got version 7 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 7 and trigger new rebalance.",
                                                timeout_sec=60,
                                                err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
-                        log_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
+                        log_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
                                                timeout_sec=60,
-                                               err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
+                                               err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
                     else:
                         first_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
                                                        timeout_sec=60,
                                                        err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(first_other_node.account))
-                        first_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
+                        first_other_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
                                                        timeout_sec=60,
-                                                       err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
+                                                       err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
                         second_other_monitor.wait_until("Sent a version 7 subscription and group.s latest commonly supported version is 8 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 8 for next rebalance.",
                                                         timeout_sec=60,
                                                         err_msg="Never saw output 'Upgrade metadata to version 8' on" + str(second_other_node.account))
-                        second_other_monitor.wait_until("Detected that the assignor requested a rebalance. Rejoining the consumer group to trigger a new rebalance.",
+                        second_other_monitor.wait_until("Triggering the followup rebalance scheduled for 0 ms.",
                                                         timeout_sec=60,
-                                                        err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
+                                                        err_msg="Could not detect 'Triggering followup rebalance' at upgrading node " + str(node.account))
 
                     # version probing should trigger second rebalance
                     # now we check that after consecutive rebalances we have synchronized generation