You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/10/21 22:26:37 UTC
[kafka] 01/01: MINOR: distinguish between missing source topics and
internal assignment errors (#9446)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a7ec00514bac6239b3896293831f854cc92c803f
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Oct 21 09:06:25 2020 -0700
MINOR: distinguish between missing source topics and internal assignment errors (#9446)
Introduce an ASSIGNMENT_ERROR code to distinguish from INCOMPLETE_SOURCE_TOPIC_METADATA and shut down all members in case of an unexpected exception during task assignment.
Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <vv...@apache.org>
---
checkstyle/suppressions.xml | 2 +-
.../internals/StreamsPartitionAssignor.java | 115 +++++++++---------
.../internals/StreamsRebalanceListener.java | 11 +-
.../internals/assignment/AssignorError.java | 6 +-
.../internals/StreamsPartitionAssignorTest.java | 134 ++++++++++++++++++---
.../internals/StreamsRebalanceListenerTest.java | 37 ++++++
6 files changed, 227 insertions(+), 78 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c5c7f1f..d9eff63 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -144,7 +144,7 @@
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
- files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
+ files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
<suppress checks="MethodLength"
files="KTableImpl.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 4228531..4004f51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
@@ -335,75 +336,61 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
}
- final boolean versionProbing =
- checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
+ try {
+ final boolean versionProbing =
+ checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
- log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
+ log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
- // ---------------- Step One ---------------- //
+ // ---------------- Step One ---------------- //
- // parse the topology to determine the repartition source topics,
- // making sure they are created with the number of partitions as
- // the maximum of the depending sub-topologies source topics' number of partitions
- final Map<Integer, TopicsInfo> topicGroups = taskManager.builder().topicGroups();
+ // parse the topology to determine the repartition source topics,
+ // making sure they are created with the number of partitions as
+ // the maximum of the depending sub-topologies source topics' number of partitions
+ final Map<Integer, TopicsInfo> topicGroups = taskManager.builder().topicGroups();
- final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions;
- try {
- allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
- } catch (final TaskAssignmentException e) {
- return new GroupAssignment(
- errorAssignment(clientMetadataMap,
- AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
- );
- }
+ final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata);
- final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
+ final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
- log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
+ log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
- // ---------------- Step Two ---------------- //
+ // ---------------- Step Two ---------------- //
- // construct the assignment of tasks to clients
+ // construct the assignment of tasks to clients
- final Set<String> allSourceTopics = new HashSet<>();
- final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
- for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
- allSourceTopics.addAll(entry.getValue().sourceTopics);
- sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
- }
+ final Set<String> allSourceTopics = new HashSet<>();
+ final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+ for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
+ allSourceTopics.addAll(entry.getValue().sourceTopics);
+ sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+ }
- // get the tasks as partition groups from the partition grouper
- final Map<TaskId, Set<TopicPartition>> partitionsForTask =
- partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
+ // get the tasks as partition groups from the partition grouper
+ final Map<TaskId, Set<TopicPartition>> partitionsForTask =
+ partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
- final Set<TaskId> statefulTasks = new HashSet<>();
+ final Set<TaskId> statefulTasks = new HashSet<>();
- final boolean probingRebalanceNeeded;
- try {
- probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
- } catch (final TaskAssignmentException e) {
- return new GroupAssignment(
- errorAssignment(clientMetadataMap,
- AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
- );
- }
+ final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks);
- // ---------------- Step Three ---------------- //
- // construct the global partition assignment per host map
+ // ---------------- Step Three ---------------- //
- final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
- final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
- if (minReceivedMetadataVersion >= 2) {
- populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
- }
- streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
+ // construct the global partition assignment per host map
+
+ final Map<HostInfo, Set<TopicPartition>> partitionsByHost = new HashMap<>();
+ final Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost = new HashMap<>();
+ if (minReceivedMetadataVersion >= 2) {
+ populatePartitionsByHostMaps(partitionsByHost, standbyPartitionsByHost, partitionsForTask, clientMetadataMap);
+ }
+ streamsMetadataState.onChange(partitionsByHost, standbyPartitionsByHost, fullMetadata);
- // ---------------- Step Four ---------------- //
+ // ---------------- Step Four ---------------- //
- // compute the assignment of tasks to threads within each client and build the final group assignment
+ // compute the assignment of tasks to threads within each client and build the final group assignment
- final Map<String, Assignment> assignment = computeNewAssignment(
+ final Map<String, Assignment> assignment = computeNewAssignment(
statefulTasks,
clientMetadataMap,
partitionsForTask,
@@ -414,9 +401,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
minSupportedMetadataVersion,
versionProbing,
probingRebalanceNeeded
- );
+ );
- return new GroupAssignment(assignment);
+ return new GroupAssignment(assignment);
+ } catch (final MissingSourceTopicException e) {
+ return new GroupAssignment(
+ errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+ );
+ } catch (final TaskAssignmentException e) {
+ return new GroupAssignment(
+ errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
+ );
+ }
}
/**
@@ -439,7 +435,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
minSupportedMetadataVersion);
} else {
- throw new IllegalStateException(
+ throw new TaskAssignmentException(
"Received a future (version probing) subscription (version: " + futureMetadataVersion
+ ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion
+ ") at the same time."
@@ -471,7 +467,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
log.error("Source topic {} is missing/unknown during rebalance, please make sure all source topics " +
"have been pre-created before starting the Streams application. Returning error {}",
topic, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
- throw new TaskAssignmentException("Missing source topic during assignment.");
+ throw new MissingSourceTopicException("Missing source topic during assignment.");
}
}
for (final InternalTopicConfig topic : topicsInfo.repartitionSourceTopics.values()) {
@@ -554,7 +550,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
} else {
final Integer count = metadata.partitionCountForTopic(sourceTopicName);
if (count == null) {
- throw new IllegalStateException(
+ throw new TaskAssignmentException(
"No partition count found for source topic "
+ sourceTopicName
+ ", but it should have been."
@@ -709,8 +705,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Map<UUID, ClientMetadata> clientMetadataMap,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Set<TaskId> statefulTasks) {
- if (!statefulTasks.isEmpty())
- throw new IllegalArgumentException("The stateful tasks should not be populated before assigning tasks to clients");
+ if (!statefulTasks.isEmpty()) {
+ throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
+ }
final Map<TopicPartition, TaskId> taskForPartition = new HashMap<>();
final Map<Integer, Set<TaskId>> tasksForTopicGroup = new HashMap<>();
@@ -1240,7 +1237,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
consumersToFill.offer(consumer);
}
} else {
- throw new IllegalStateException("Ran out of unassigned stateful tasks but some members were not at capacity");
+ throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index f794f08..a46a652 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.slf4j.Logger;
@@ -52,8 +53,16 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
// NB: all task management is already handled by:
// org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
- log.error("Received error code {}", assignmentErrorCode.get());
+ log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+ } else if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
+ log.info("Received version probing code {}", AssignorError.VERSION_PROBING);
+ } else if (assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
+ log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR);
+ throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
+ } else if (assignmentErrorCode.get() != AssignorError.NONE.code()) {
+ log.error("Received unknown error code {}", assignmentErrorCode.get());
+ throw new TaskAssignmentException("Hit an unrecognized exception during rebalance");
}
streamThread.setState(State.PARTITIONS_ASSIGNED);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
index 6234a7d..44e15fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorError.java
@@ -17,8 +17,12 @@
package org.apache.kafka.streams.processor.internals.assignment;
public enum AssignorError {
+ // Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed
+ // to throw an exception upon an unrecognized error code.
NONE(0),
- INCOMPLETE_SOURCE_TOPIC_METADATA(1);
+ INCOMPLETE_SOURCE_TOPIC_METADATA(1),
+ VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions
+ ASSIGNMENT_ERROR(3);
private final int code;
AssignorError(final int code) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index fd44b2f..45d150a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.util.Properties;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -33,6 +34,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
@@ -49,6 +51,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
@@ -166,7 +169,8 @@ public class StreamsPartitionAssignorTest {
Collections.singletonList(Node.noNode()),
infos,
emptySet(),
- emptySet());
+ emptySet()
+ );
private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
@@ -1182,6 +1186,76 @@ public class StreamsPartitionAssignorTest {
}
@Test
+ public void shouldThrowTimeoutExceptionWhenCreatingRepartitionTopicsTimesOut() {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ streamsBuilder.stream("topic1").repartition();
+
+ final String client = "client1";
+ builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
+
+ createDefaultMockTaskManager();
+ EasyMock.replay(taskManager);
+ partitionAssignor.configure(configProps());
+ final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+ time,
+ new StreamsConfig(configProps()),
+ mockClientSupplier.restoreConsumer,
+ false
+ ) {
+ @Override
+ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
+ throw new TimeoutException("KABOOM!");
+ }
+ };
+ partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+ subscriptions.put(client,
+ new Subscription(
+ singletonList("topic1"),
+ defaultSubscriptionInfo.encode()
+ )
+ );
+ assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+ }
+
+ @Test
+ public void shouldThrowTimeoutExceptionWhenCreatingChangelogTopicsTimesOut() {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ streamsBuilder.table("topic1", Materialized.as("store"));
+
+ final String client = "client1";
+ builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
+
+ createDefaultMockTaskManager();
+ EasyMock.replay(taskManager);
+ partitionAssignor.configure(configProps());
+ final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+ time,
+ new StreamsConfig(configProps()),
+ mockClientSupplier.restoreConsumer,
+ false
+ ) {
+ @Override
+ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
+ if (topics.isEmpty()) {
+ return emptySet();
+ }
+ throw new TimeoutException("KABOOM!");
+ }
+ };
+ partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+ subscriptions.put(client,
+ new Subscription(
+ singletonList("topic1"),
+ defaultSubscriptionInfo.encode()
+ )
+ );
+
+ assertThrows(TimeoutException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+ }
+
+ @Test
public void shouldAddUserDefinedEndPointToSubscription() {
builder.addSource(null, "source", null, null, null, "input");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
@@ -1652,13 +1726,13 @@ public class StreamsPartitionAssignorTest {
}
@Test
- public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() {
- shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
+ public void shouldEncodeAssignmentErrorIfV1SubscriptionAndFutureSubscriptionIsMixed() {
+ shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
}
@Test
- public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
- shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
+ public void shouldEncodeAssignmentErrorIfV2SubscriptionAndFutureSubscriptionIsMixed() {
+ shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
}
@Test
@@ -1871,7 +1945,7 @@ public class StreamsPartitionAssignorTest {
);
final Capture<Map<TopicPartition, OffsetSpec>> capturedChangelogs = EasyMock.newCapture();
- expect(adminClient.listOffsets(EasyMock.capture(capturedChangelogs))).andStubReturn(result);
+ expect(adminClient.listOffsets(EasyMock.capture(capturedChangelogs))).andReturn(result).once();
expect(result.all()).andReturn(allFuture);
builder.addSource(null, "source1", null, null, null, "topic1");
@@ -1900,29 +1974,32 @@ public class StreamsPartitionAssignorTest {
@Test
public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
final Set<TopicPartition> changelogs = mkSet(
- new TopicPartition(APPLICATION_ID + "-store-changelog", 0),
- new TopicPartition(APPLICATION_ID + "-store-changelog", 1),
- new TopicPartition(APPLICATION_ID + "-store-changelog", 2)
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1),
+ new TopicPartition("topic1", 2)
);
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.table("topic1", Materialized.as("store"));
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+ builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build(props));
+
subscriptions.put("consumer10",
new Subscription(
singletonList("topic1"),
defaultSubscriptionInfo.encode()
));
- final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
-
createDefaultMockTaskManager();
+ final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE));
overwriteInternalTopicManagerWithMock(false);
- EasyMock.expect(consumerClient.committed(changelogs))
- .andStubReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE))));
+ EasyMock.expect(consumerClient.committed(EasyMock.eq(changelogs)))
+ .andReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE)))).once();
EasyMock.replay(consumerClient);
partitionAssignor.assign(metadata, new GroupSubscription(subscriptions));
@@ -1931,6 +2008,29 @@ public class StreamsPartitionAssignorTest {
}
@Test
+ public void shouldEncodeMissingSourceTopicError() {
+ final Cluster emptyClusterMetadata = new Cluster(
+ "cluster",
+ Collections.singletonList(Node.noNode()),
+ emptyList(),
+ emptySet(),
+ emptySet()
+ );
+
+ builder.addSource(null, "source1", null, null, null, "topic1");
+ configureDefault();
+
+ subscriptions.put("consumer",
+ new Subscription(
+ singletonList("topic"),
+ defaultSubscriptionInfo.encode()
+ ));
+ final Map<String, Assignment> assignments = partitionAssignor.assign(emptyClusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+ assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(),
+ equalTo(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
+ }
+
+ @Test
public void testUniqueField() {
createDefaultMockTaskManager();
configureDefaultPartitionAssignor();
@@ -1941,7 +2041,6 @@ public class StreamsPartitionAssignorTest {
assertEquals(1, partitionAssignor.uniqueField());
partitionAssignor.subscriptionUserData(topics);
assertEquals(2, partitionAssignor.uniqueField());
-
}
@Test
@@ -1965,7 +2064,7 @@ public class StreamsPartitionAssignorTest {
return buf;
}
- private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
+ private void shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
subscriptions.put("consumer1",
new Subscription(
Collections.singletonList("topic1"),
@@ -1978,7 +2077,10 @@ public class StreamsPartitionAssignorTest {
);
configureDefault();
- assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
+ final Map<String, Assignment> assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
+
+ assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
+ assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()).errCode(), equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
}
private static Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
index b8ccc94..aecab44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.junit.Before;
@@ -71,6 +72,42 @@ public class StreamsRebalanceListenerTest {
}
@Test
+ public void shouldSwallowVersionProbingError() {
+ expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andStubReturn(State.PARTITIONS_REVOKED);
+ taskManager.handleRebalanceComplete();
+ replay(taskManager, streamThread);
+ assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
+ streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
+ verify(taskManager, streamThread);
+ }
+
+ @Test
+ public void shouldThrowTaskAssignmentException() {
+ replay(taskManager, streamThread);
+ assignmentErrorCode.set(AssignorError.ASSIGNMENT_ERROR.code());
+
+ final TaskAssignmentException exception = assertThrows(
+ TaskAssignmentException.class,
+ () -> streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList())
+ );
+ assertThat(exception.getMessage(), is("Hit an unexpected exception during task assignment phase of rebalance"));
+ verify(taskManager, streamThread);
+ }
+
+ @Test
+ public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() {
+ replay(taskManager, streamThread);
+ assignmentErrorCode.set(Integer.MAX_VALUE);
+
+ final TaskAssignmentException exception = assertThrows(
+ TaskAssignmentException.class,
+ () -> streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList())
+ );
+ assertThat(exception.getMessage(), is("Hit an unrecognized exception during rebalance"));
+ verify(taskManager, streamThread);
+ }
+
+ @Test
public void shouldHandleAssignedPartitions() {
taskManager.handleRebalanceComplete();
expect(streamThread.setState(State.PARTITIONS_ASSIGNED)).andReturn(State.RUNNING);