You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/10/21 22:26:37 UTC

[kafka] 01/01: MINOR: distinguish between missing source topics and internal assignment errors (#9446)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a7ec00514bac6239b3896293831f854cc92c803f
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Oct 21 09:06:25 2020 -0700

    MINOR: distinguish between missing source topics and internal assignment errors (#9446)
    
    Introduce an ASSIGNMENT_ERROR code to distinguish from INCOMPLETE_SOURCE_TOPIC_METADATA and shut down all members in case of an unexpected exception during task assignment.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>,  John Roesler <vv...@apache.org>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../internals/StreamsPartitionAssignor.java        | 115 +++++++++---------
 .../internals/StreamsRebalanceListener.java        |  11 +-
 .../internals/assignment/AssignorError.java        |   6 +-
 .../internals/StreamsPartitionAssignorTest.java    | 134 ++++++++++++++++++---
 .../internals/StreamsRebalanceListenerTest.java    |  37 ++++++
 6 files changed, 227 insertions(+), 78 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c5c7f1f..d9eff63 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -144,7 +144,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
+              files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
 
     <suppress checks="MethodLength"
               files="KTableImpl.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 4228531..4004f51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -335,75 +336,61 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
         }
 
-        final boolean versionProbing =
-            checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
+        try {
+            final boolean versionProbing =
+                checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
 
-        log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
+            log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
 
-        // ---------------- Step One ---------------- //
+            // ---------------- Step One ---------------- //
 
-        // parse the topology to determine the repartition source topics,
-        // making sure they are created with the number of partitions as
-        // the maximum of the depending sub-topologies source topics' number of partitions
-        final Map<Integer, TopicsInfo> topicGroups = taskManager.builder().topicGroups();
+            // parse the topology to determine the repartition source topics,
+            // making sure they are created with the number of partitions as
+            // the maximum of the depending sub-topologies source topics' number of partitions
+            final Map<Integer, TopicsInfo> topicGroups = taskManager.builder().topicGroups();
 
-        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
-        try {
-            allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
-        } catch (final TaskAssignmentException e) {
-            return new GroupAssignment(
-                errorAssignment(clientMetadataMap,
-                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-            );
-        }
+            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
 
-        final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
+            final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
 
-        log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
+            log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
 
-        // ---------------- Step Two ---------------- //
+            // ---------------- Step Two ---------------- //
 
-        // construct the assignment of tasks to clients
+            // construct the assignment of tasks to clients
 
-        final Set<String> allSourceTopics = new HashSet<>();
-        final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
-            allSourceTopics.addAll(entry.getValue().sourceTopics);
-            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
-        }
+            final Set<String> allSourceTopics = new HashSet<>();
+            final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+            for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
+                allSourceTopics.addAll(entry.getValue().sourceTopics);
+                sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+            }
 
-        // get the tasks as partition groups from the partition grouper
-        final Map<TaskId, Set<TopicPartition>> partitionsForTask =
-            partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
+            // get the tasks as partition groups from the partition grouper
+            final Map<TaskId, Set<TopicPartition>> partitionsForTask =
+                partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
 
-        final Set<TaskId> statefulTasks = new HashSet<>();
+            final Set<TaskId> statefulTasks = new HashSet<>();
 
-        final boolean probingRebalanceNeeded;
-        try {
-            probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
-        } catch (final TaskAssignmentException e) {
-            return new GroupAssignment(
-                errorAssignment(clientMetadataMap,
-                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-            );
-        }
+            final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
 
-        // ---------------- Step Three ---------------- //
 
-        // construct the global partition assignment per host map
+            // ---------------- Step Three ---------------- //
 
-        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
-        final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
-        if (minReceivedMetadataVersion >= 2) {
-            populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
-        }
-        streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
+            // construct the global partition assignment per host map
+
+            final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
+            final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
+            if (minReceivedMetadataVersion >= 2) {
+                populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
+            }
+            streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
 
-        // ---------------- Step Four ---------------- //
+            // ---------------- Step Four ---------------- //
 
-        // compute the assignment of tasks to threads within each client and build the final group assignment
+            // compute the assignment of tasks to threads within each client and build the final group assignment
 
-        final Map<String, Assignment> assignment = computeNewAssignment(
+            final Map<String, Assignment> assignment = computeNewAssignment(
                 statefulTasks,
                 clientMetadataMap,
                 partitionsForTask,
@@ -414,9 +401,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
+            );
+        }
     }
 
     /**
@@ -439,7 +435,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 minSupportedMetadataVersion);
 
         } else {
-            throw new IllegalStateException(
+            throw new TaskAssignmentException(
                 "Received a future (version probing) subscription (version: " + futureMetadataVersion
                     + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion
                     + ") at the same time."
@@ -471,7 +467,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                     log.error("Source topic {} is missing/unknown during rebalance, please make sure all source topics " +
                                   "have been pre-created before starting the Streams application. Returning error {}",
                                   topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
-                    throw new TaskAssignmentException("Missing source topic during assignment.");
+                    throw new MissingSourceTopicException("Missing source topic during assignment.");
                 }
             }
             for (final InternalTopicConfig topic : topicsInfo.repartitionSourceTopics.values()) {
@@ -554,7 +550,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                                     } else {
                                         final Integer count = metadata.partitionCountForTopic(sourceTopicName);
                                         if (count == null) {
-                                            throw new IllegalStateException(
+                                            throw new TaskAssignmentException(
                                                 "No partition count found for source topic "
                                                     + sourceTopicName
                                                     + ", but it should have been."
@@ -709,8 +705,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                                          final Map<UUID, ClientMetadata> clientMetadataMap,
                                          final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                          final Set<TaskId> statefulTasks) {
-        if (!statefulTasks.isEmpty())
-            throw new IllegalArgumentException("The stateful tasks should not be populated before assigning tasks to clients");
+        if (!statefulTasks.isEmpty()) {
+            throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
+        }
 
         final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
         final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
@@ -1240,7 +1237,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                         consumersToFill.offer(consumer);
                     }
                 } else {
-                    throw new IllegalStateException("Ran out of unassigned stateful tasks but some members were not at capacity");
+                    throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index f794f08..a46a652 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.slf4j.Logger;
@@ -52,8 +53,16 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {}", assignmentErrorCode.get());
+            log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+        } else if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
+            log.info("Received version probing code {}", AssignorError.VERSION_PROBING);
+        }  else if (assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
+            log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR);
+            throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
+        } else if (assignmentErrorCode.get() != AssignorError.NONE.code()) {
+            log.error("Received unknown error code {}", assignmentErrorCode.get());
+            throw new TaskAssignmentException("Hit an unrecognized exception during rebalance");
         }
 
         streamThread.setState(State.PARTITIONS_ASSIGNED);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
index 6234a7d..44e15fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
@@ -17,8 +17,12 @@
 package org.apache.kafka.streams.processor.internals.assignment;
 
 public enum AssignorError {
+    // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed
+    // to throw an exception upon an unrecognized error code.
     NONE(0),
-    INCOMPLETE_SOURCE_TOPIC_METADATA(1);
+    INCOMPLETE_SOURCE_TOPIC_METADATA(1),
+    VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions
+    ASSIGNMENT_ERROR(3);
     private final int code;
 
     AssignorError(final int code) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index fd44b2f..45d150a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Properties;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -33,6 +34,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
@@ -49,6 +51,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
@@ -166,7 +169,8 @@ public class StreamsPartitionAssignorTest {
         Collections.singletonList(Node.noNode()),
         infos,
         emptySet(),
-        emptySet());
+        emptySet()
+    );
 
     private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
@@ -1182,6 +1186,76 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
+    public void shouldThrowTimeoutExceptionWhenCreatingRepartitionTopicsTimesOut() {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        streamsBuilder.stream("topic1").repartition();
+
+        final String client = "client1";
+        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
+
+        createDefaultMockTaskManager();
+        EasyMock.replay(taskManager);
+        partitionAssignor.configure(configProps());
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            new StreamsConfig(configProps()),
+            mockClientSupplier.restoreConsumer,
+            false
+        ) {
+            @Override
+            public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
+                throw new TimeoutException("KABOOM!");
+            }
+        };
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        subscriptions.put(client,
+                          new Subscription(
+                              singletonList("topic1"),
+                              defaultSubscriptionInfo.encode()
+                          )
+        );
+        assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+    }
+
+    @Test
+    public void shouldThrowTimeoutExceptionWhenCreatingChangelogTopicsTimesOut() {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        streamsBuilder.table("topic1", Materialized.as("store"));
+
+        final String client = "client1";
+        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
+
+        createDefaultMockTaskManager();
+        EasyMock.replay(taskManager);
+        partitionAssignor.configure(configProps());
+        final MockInternalTopicManager mockInternalTopicManager =  new MockInternalTopicManager(
+            time,
+            new StreamsConfig(configProps()),
+            mockClientSupplier.restoreConsumer,
+            false
+        ) {
+            @Override
+            public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
+                if (topics.isEmpty()) {
+                    return emptySet();
+                }
+                throw new TimeoutException("KABOOM!");
+            }
+        };
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        subscriptions.put(client,
+            new Subscription(
+                singletonList("topic1"),
+                defaultSubscriptionInfo.encode()
+            )
+        );
+
+        assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+    }
+
+    @Test
     public void shouldAddUserDefinedEndPointToSubscription() {
         builder.addSource(null, "source", null, null, null, "input");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
@@ -1652,13 +1726,13 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() {
-        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
+    public void shouldEncodeAssignmentErrorIfV1SubscriptionAndFutureSubscriptionIsMixed() {
+        shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
     }
 
     @Test
-    public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
-        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
+    public void shouldEncodeAssignmentErrorIfV2SubscriptionAndFutureSubscriptionIsMixed() {
+        shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
     }
 
     @Test
@@ -1871,7 +1945,7 @@ public class StreamsPartitionAssignorTest {
         );
         final Capture<Map<TopicPartition, OffsetSpec>> capturedChangelogs = EasyMock.newCapture();
 
-        expect(adminClient.listOffsets(EasyMock.capture(capturedChangelogs))).andStubReturn(result);
+        expect(adminClient.listOffsets(EasyMock.capture(capturedChangelogs))).andReturn(result).once();
         expect(result.all()).andReturn(allFuture);
 
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -1900,29 +1974,32 @@ public class StreamsPartitionAssignorTest {
     @Test
     public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         final Set<TopicPartition> changelogs = mkSet(
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 0),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 1),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 2)
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic1", 1),
+            new TopicPartition("topic1", 2)
         );
 
         final StreamsBuilder streamsBuilder = new StreamsBuilder();
         streamsBuilder.table("topic1", Materialized.as("store"));
 
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build(props));
+
         subscriptions.put("consumer10",
             new Subscription(
                 singletonList("topic1"),
                 defaultSubscriptionInfo.encode()
             ));
 
-        final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
-
         createDefaultMockTaskManager();
+        final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
         EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
         configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE));
         overwriteInternalTopicManagerWithMock(false);
 
-        EasyMock.expect(consumerClient.committed(changelogs))
-            .andStubReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE))));
+        EasyMock.expect(consumerClient.committed(EasyMock.eq(changelogs)))
+            .andReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE)))).once();
 
         EasyMock.replay(consumerClient);
         partitionAssignor.assign(metadata, new GroupSubscription(subscriptions));
@@ -1931,6 +2008,29 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
+    public void shouldEncodeMissingSourceTopicError() {
+        final Cluster emptyClusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            emptyList(),
+            emptySet(),
+            emptySet()
+        );
+
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        configureDefault();
+
+        subscriptions.put("consumer",
+                          new Subscription(
+                              singletonList("topic"),
+                              defaultSubscriptionInfo.encode()
+                          ));
+        final Map<String, Assignment> assignments = partitionAssignor.assign(emptyClusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(),
+                   equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
+    }
+
+    @Test
     public void testUniqueField() {
         createDefaultMockTaskManager();
         configureDefaultPartitionAssignor();
@@ -1941,7 +2041,6 @@ public class StreamsPartitionAssignorTest {
         assertEquals(1, partitionAssignor.uniqueField());
         partitionAssignor.subscriptionUserData(topics);
         assertEquals(2, partitionAssignor.uniqueField());
-
     }
 
     @Test
@@ -1965,7 +2064,7 @@ public class StreamsPartitionAssignorTest {
         return buf;
     }
 
-    private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
+    private void shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
         subscriptions.put("consumer1",
                           new Subscription(
                               Collections.singletonList("topic1"),
@@ -1978,7 +2077,10 @@ public class StreamsPartitionAssignorTest {
         );
         configureDefault();
 
-        assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+        final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
+        assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
     }
 
     private static Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
index b8ccc94..aecab44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.junit.Before;
@@ -71,6 +72,42 @@ public class StreamsRebalanceListenerTest {
     }
 
     @Test
+    public void shouldSwallowVersionProbingError() {
+        expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(State.PARTITIONS_REVOKED);
+        taskManager.handleRebalanceComplete();
+        replay(taskManager, streamThread);
+        assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
+        streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
+        verify(taskManager, streamThread);
+    }
+
+    @Test
+    public void shouldThrowTaskAssignmentException() {
+        replay(taskManager, streamThread);
+        assignmentErrorCode.set(AssignorError.ASSIGNMENT_ERROR.code());
+
+        final TaskAssignmentException exception = assertThrows(
+            TaskAssignmentException.class,
+            () -> streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList())
+        );
+        assertThat(exception.getMessage(), is("Hit an unexpected exception during task assignment phase of rebalance"));
+        verify(taskManager, streamThread);
+    }
+
+    @Test
+    public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() {
+        replay(taskManager, streamThread);
+        assignmentErrorCode.set(Integer.MAX_VALUE);
+
+        final TaskAssignmentException exception = assertThrows(
+            TaskAssignmentException.class,
+            () -> streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList())
+        );
+        assertThat(exception.getMessage(), is("Hit an unrecognized exception during rebalance"));
+        verify(taskManager, streamThread);
+    }
+
+    @Test
     public void shouldHandleAssignedPartitions() {
         taskManager.handleRebalanceComplete();
         expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING);