You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/10/21 22:26:36 UTC

[kafka] branch 2.7 updated (ab65b8b -> a7ec005)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a change to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git.


 discard ab65b8b  KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1 (#9406)
 discard f605af8  MINOR: distinguish between missing source topics and internal assignment errors (#9446)
     new a7ec005  MINOR: distinguish between missing source topics and internal assignment errors (#9446)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ab65b8b)
            \
             N -- N -- N   refs/heads/2.7 (a7ec005)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/clients/producer/internals/Sender.java   |  31 ++---
 .../clients/producer/internals/SenderTest.java     | 136 +--------------------
 .../api/TransactionsWithMaxInFlightOneTest.scala   | 131 --------------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   4 +-
 .../internals/StreamsPartitionAssignorTest.java    |   2 +-
 5 files changed, 14 insertions(+), 290 deletions(-)
 delete mode 100644 core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala


[kafka] 01/01: MINOR: distinguish between missing source topics and internal assignment errors (#9446)

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a7ec00514bac6239b3896293831f854cc92c803f
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Oct 21 09:06:25 2020 -0700

    MINOR: distinguish between missing source topics and internal assignment errors (#9446)
    
    Introduce an ASSIGNMENT_ERROR code to distinguish from INCOMPLETE_SOURCE_TOPIC_METADATA and shut down all members in case of an unexpected exception during task assignment.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>,  John Roesler <vv...@apache.org>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../internals/StreamsPartitionAssignor.java        | 115 +++++++++---------
 .../internals/StreamsRebalanceListener.java        |  11 +-
 .../internals/assignment/AssignorError.java        |   6 +-
 .../internals/StreamsPartitionAssignorTest.java    | 134 ++++++++++++++++++---
 .../internals/StreamsRebalanceListenerTest.java    |  37 ++++++
 6 files changed, 227 insertions(+), 78 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c5c7f1f..d9eff63 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -144,7 +144,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
+              files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
 
     <suppress checks="MethodLength"
               files="KTableImpl.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 4228531..4004f51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
@@ -335,75 +336,61 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
         }
 
-        final boolean versionProbing =
-            checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
+        try {
+            final boolean versionProbing =
+                checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
 
-        log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
+            log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
 
-        // ---------------- Step One ---------------- //
+            // ---------------- Step One ---------------- //
 
-        // parse the topology to determine the repartition source topics,
-        // making sure they are created with the number of partitions as
-        // the maximum of the depending sub-topologies source topics' number of partitions
-        final Map<Integer, TopicsInfo> topicGroups = taskManager.builder().topicGroups();
+            // parse the topology to determine the repartition source topics,
+            // making sure they are created with the number of partitions as
+            // the maximum of the depending sub-topologies source topics' number of partitions
+            final Map<Integer, TopicsInfo> topicGroups = taskManager.builder().topicGroups();
 
-        final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
-        try {
-            allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
-        } catch (final TaskAssignmentException e) {
-            return new GroupAssignment(
-                errorAssignment(clientMetadataMap,
-                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-            );
-        }
+            final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
 
-        final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
+            final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
 
-        log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
+            log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
 
-        // ---------------- Step Two ---------------- //
+            // ---------------- Step Two ---------------- //
 
-        // construct the assignment of tasks to clients
+            // construct the assignment of tasks to clients
 
-        final Set<String> allSourceTopics = new HashSet<>();
-        final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
-            allSourceTopics.addAll(entry.getValue().sourceTopics);
-            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
-        }
+            final Set<String> allSourceTopics = new HashSet<>();
+            final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+            for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
+                allSourceTopics.addAll(entry.getValue().sourceTopics);
+                sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+            }
 
-        // get the tasks as partition groups from the partition grouper
-        final Map<TaskId, Set<TopicPartition>> partitionsForTask =
-            partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
+            // get the tasks as partition groups from the partition grouper
+            final Map<TaskId, Set<TopicPartition>> partitionsForTask =
+                partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
 
-        final Set<TaskId> statefulTasks = new HashSet<>();
+            final Set<TaskId> statefulTasks = new HashSet<>();
 
-        final boolean probingRebalanceNeeded;
-        try {
-            probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
-        } catch (final TaskAssignmentException e) {
-            return new GroupAssignment(
-                errorAssignment(clientMetadataMap,
-                    AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
-            );
-        }
+            final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
 
-        // ---------------- Step Three ---------------- //
 
-        // construct the global partition assignment per host map
+            // ---------------- Step Three ---------------- //
 
-        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
-        final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
-        if (minReceivedMetadataVersion >= 2) {
-            populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
-        }
-        streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
+            // construct the global partition assignment per host map
+
+            final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
+            final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
+            if (minReceivedMetadataVersion >= 2) {
+                populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
+            }
+            streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
 
-        // ---------------- Step Four ---------------- //
+            // ---------------- Step Four ---------------- //
 
-        // compute the assignment of tasks to threads within each client and build the final group assignment
+            // compute the assignment of tasks to threads within each client and build the final group assignment
 
-        final Map<String, Assignment> assignment = computeNewAssignment(
+            final Map<String, Assignment> assignment = computeNewAssignment(
                 statefulTasks,
                 clientMetadataMap,
                 partitionsForTask,
@@ -414,9 +401,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
+            );
+        }
     }
 
     /**
@@ -439,7 +435,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 minSupportedMetadataVersion);
 
         } else {
-            throw new IllegalStateException(
+            throw new TaskAssignmentException(
                 "Received a future (version probing) subscription (version: " + futureMetadataVersion
                     + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion
                     + ") at the same time."
@@ -471,7 +467,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                     log.error("Source topic {} is missing/unknown during rebalance, please make sure all source topics " +
                                   "have been pre-created before starting the Streams application. Returning error {}",
                                   topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
-                    throw new TaskAssignmentException("Missing source topic during assignment.");
+                    throw new MissingSourceTopicException("Missing source topic during assignment.");
                 }
             }
             for (final InternalTopicConfig topic : topicsInfo.repartitionSourceTopics.values()) {
@@ -554,7 +550,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                                     } else {
                                         final Integer count = metadata.partitionCountForTopic(sourceTopicName);
                                         if (count == null) {
-                                            throw new IllegalStateException(
+                                            throw new TaskAssignmentException(
                                                 "No partition count found for source topic "
                                                     + sourceTopicName
                                                     + ", but it should have been."
@@ -709,8 +705,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                                          final Map<UUID, ClientMetadata> clientMetadataMap,
                                          final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                          final Set<TaskId> statefulTasks) {
-        if (!statefulTasks.isEmpty())
-            throw new IllegalArgumentException("The stateful tasks should not be populated before assigning tasks to clients");
+        if (!statefulTasks.isEmpty()) {
+            throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
+        }
 
         final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
         final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
@@ -1240,7 +1237,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                         consumersToFill.offer(consumer);
                     }
                 } else {
-                    throw new IllegalStateException("Ran out of unassigned stateful tasks but some members were not at capacity");
+                    throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
                 }
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index f794f08..a46a652 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.slf4j.Logger;
@@ -52,8 +53,16 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {}", assignmentErrorCode.get());
+            log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+        } else if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
+            log.info("Received version probing code {}", AssignorError.VERSION_PROBING);
+        }  else if (assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
+            log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR);
+            throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
+        } else if (assignmentErrorCode.get() != AssignorError.NONE.code()) {
+            log.error("Received unknown error code {}", assignmentErrorCode.get());
+            throw new TaskAssignmentException("Hit an unrecognized exception during rebalance");
         }
 
         streamThread.setState(State.PARTITIONS_ASSIGNED);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
index 6234a7d..44e15fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
@@ -17,8 +17,12 @@
 package org.apache.kafka.streams.processor.internals.assignment;
 
 public enum AssignorError {
+    // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed
+    // to throw an exception upon an unrecognized error code.
     NONE(0),
-    INCOMPLETE_SOURCE_TOPIC_METADATA(1);
+    INCOMPLETE_SOURCE_TOPIC_METADATA(1),
+    VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions
+    ASSIGNMENT_ERROR(3);
     private final int code;
 
     AssignorError(final int code) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index fd44b2f..45d150a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Properties;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -33,6 +34,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
@@ -49,6 +51,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
 import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
@@ -166,7 +169,8 @@ public class StreamsPartitionAssignorTest {
         Collections.singletonList(Node.noNode()),
         infos,
         emptySet(),
-        emptySet());
+        emptySet()
+    );
 
     private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
@@ -1182,6 +1186,76 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
+    public void shouldThrowTimeoutExceptionWhenCreatingRepartitionTopicsTimesOut() {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        streamsBuilder.stream("topic1").repartition();
+
+        final String client = "client1";
+        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
+
+        createDefaultMockTaskManager();
+        EasyMock.replay(taskManager);
+        partitionAssignor.configure(configProps());
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            new StreamsConfig(configProps()),
+            mockClientSupplier.restoreConsumer,
+            false
+        ) {
+            @Override
+            public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
+                throw new TimeoutException("KABOOM!");
+            }
+        };
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        subscriptions.put(client,
+                          new Subscription(
+                              singletonList("topic1"),
+                              defaultSubscriptionInfo.encode()
+                          )
+        );
+        assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+    }
+
+    @Test
+    public void shouldThrowTimeoutExceptionWhenCreatingChangelogTopicsTimesOut() {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        streamsBuilder.table("topic1", Materialized.as("store"));
+
+        final String client = "client1";
+        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
+
+        createDefaultMockTaskManager();
+        EasyMock.replay(taskManager);
+        partitionAssignor.configure(configProps());
+        final MockInternalTopicManager mockInternalTopicManager =  new MockInternalTopicManager(
+            time,
+            new StreamsConfig(configProps()),
+            mockClientSupplier.restoreConsumer,
+            false
+        ) {
+            @Override
+            public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
+                if (topics.isEmpty()) {
+                    return emptySet();
+                }
+                throw new TimeoutException("KABOOM!");
+            }
+        };
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        subscriptions.put(client,
+            new Subscription(
+                singletonList("topic1"),
+                defaultSubscriptionInfo.encode()
+            )
+        );
+
+        assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+    }
+
+    @Test
     public void shouldAddUserDefinedEndPointToSubscription() {
         builder.addSource(null, "source", null, null, null, "input");
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
@@ -1652,13 +1726,13 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() {
-        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
+    public void shouldEncodeAssignmentErrorIfV1SubscriptionAndFutureSubscriptionIsMixed() {
+        shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
     }
 
     @Test
-    public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
-        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
+    public void shouldEncodeAssignmentErrorIfV2SubscriptionAndFutureSubscriptionIsMixed() {
+        shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
     }
 
     @Test
@@ -1871,7 +1945,7 @@ public class StreamsPartitionAssignorTest {
         );
         final Capture<Map<TopicPartition, OffsetSpec>> capturedChangelogs = EasyMock.newCapture();
 
-        expect(adminClient.listOffsets(EasyMock.capture(capturedChangelogs))).andStubReturn(result);
+        expect(adminClient.listOffsets(EasyMock.capture(capturedChangelogs))).andReturn(result).once();
         expect(result.all()).andReturn(allFuture);
 
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -1900,29 +1974,32 @@ public class StreamsPartitionAssignorTest {
     @Test
     public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         final Set<TopicPartition> changelogs = mkSet(
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 0),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 1),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 2)
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic1", 1),
+            new TopicPartition("topic1", 2)
         );
 
         final StreamsBuilder streamsBuilder = new StreamsBuilder();
         streamsBuilder.table("topic1", Materialized.as("store"));
 
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+        builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build(props));
+
         subscriptions.put("consumer10",
             new Subscription(
                 singletonList("topic1"),
                 defaultSubscriptionInfo.encode()
             ));
 
-        final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
-
         createDefaultMockTaskManager();
+        final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
         EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
         configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE));
         overwriteInternalTopicManagerWithMock(false);
 
-        EasyMock.expect(consumerClient.committed(changelogs))
-            .andStubReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE))));
+        EasyMock.expect(consumerClient.committed(EasyMock.eq(changelogs)))
+            .andReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE)))).once();
 
         EasyMock.replay(consumerClient);
         partitionAssignor.assign(metadata, new GroupSubscription(subscriptions));
@@ -1931,6 +2008,29 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
+    public void shouldEncodeMissingSourceTopicError() {
+        final Cluster emptyClusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            emptyList(),
+            emptySet(),
+            emptySet()
+        );
+
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        configureDefault();
+
+        subscriptions.put("consumer",
+                          new Subscription(
+                              singletonList("topic"),
+                              defaultSubscriptionInfo.encode()
+                          ));
+        final Map<String, Assignment> assignments = partitionAssignor.assign(emptyClusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(),
+                   equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
+    }
+
+    @Test
     public void testUniqueField() {
         createDefaultMockTaskManager();
         configureDefaultPartitionAssignor();
@@ -1941,7 +2041,6 @@ public class StreamsPartitionAssignorTest {
         assertEquals(1, partitionAssignor.uniqueField());
         partitionAssignor.subscriptionUserData(topics);
         assertEquals(2, partitionAssignor.uniqueField());
-
     }
 
     @Test
@@ -1965,7 +2064,7 @@ public class StreamsPartitionAssignorTest {
         return buf;
     }
 
-    private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
+    private void shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
         subscriptions.put("consumer1",
                           new Subscription(
                               Collections.singletonList("topic1"),
@@ -1978,7 +2077,10 @@ public class StreamsPartitionAssignorTest {
         );
         configureDefault();
 
-        assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+        final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+        assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
+        assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
     }
 
     private static Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
index b8ccc94..aecab44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import org.junit.Before;
@@ -71,6 +72,42 @@ public class StreamsRebalanceListenerTest {
     }
 
     @Test
+    public void shouldSwallowVersionProbingError() {
+        expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(State.PARTITIONS_REVOKED);
+        taskManager.handleRebalanceComplete();
+        replay(taskManager, streamThread);
+        assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
+        streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
+        verify(taskManager, streamThread);
+    }
+
+    @Test
+    public void shouldThrowTaskAssignmentException() {
+        replay(taskManager, streamThread);
+        assignmentErrorCode.set(AssignorError.ASSIGNMENT_ERROR.code());
+
+        final TaskAssignmentException exception = assertThrows(
+            TaskAssignmentException.class,
+            () -> streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList())
+        );
+        assertThat(exception.getMessage(), is("Hit an unexpected exception during task assignment phase of rebalance"));
+        verify(taskManager, streamThread);
+    }
+
+    @Test
+    public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() {
+        replay(taskManager, streamThread);
+        assignmentErrorCode.set(Integer.MAX_VALUE);
+
+        final TaskAssignmentException exception = assertThrows(
+            TaskAssignmentException.class,
+            () -> streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList())
+        );
+        assertThat(exception.getMessage(), is("Hit an unrecognized exception during rebalance"));
+        verify(taskManager, streamThread);
+    }
+
+    @Test
     public void shouldHandleAssignedPartitions() {
         taskManager.handleRebalanceComplete();
         expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING);