You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/29 17:48:32 UTC
[1/2] kafka git commit: KAFKA-4117: Stream partitionassignro cleanup
Repository: kafka
Updated Branches:
refs/heads/trunk 0dd9607f9 -> a4ab9d02a
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e46a016..cd9b7a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -90,13 +90,13 @@ public class StreamPartitionAssignorTest {
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
private final TaskId task3 = new TaskId(0, 3);
- private String userEndPoint = null;
+ private String userEndPoint = "localhost:2171";
private Properties configProps() {
return new Properties() {
{
setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
- setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+ setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
}
@@ -297,12 +297,12 @@ public class StreamPartitionAssignorTest {
TaskId task10 = new TaskId(1, 0);
TaskId task11 = new TaskId(1, 1);
TaskId task12 = new TaskId(1, 2);
+ List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);
UUID uuid1 = UUID.randomUUID();
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
-
StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
@@ -323,14 +323,43 @@ public class StreamPartitionAssignorTest {
assertEquals(2, assignments.get("consumer11").partitions().size());
assertEquals(2, assignments.get("consumer20").partitions().size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+ AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+
+ assertEquals(2, info10.activeTasks.size());
+ assertEquals(2, info11.activeTasks.size());
+ assertEquals(2, info20.activeTasks.size());
+
+ Set<TaskId> allTasks = new HashSet<>();
+ allTasks.addAll(info10.activeTasks);
+ allTasks.addAll(info11.activeTasks);
+ allTasks.addAll(info20.activeTasks);
+ assertEquals(new HashSet<>(tasks), allTasks);
// check tasks for state topics
- assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
- assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
- assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
+ Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+
+ assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups));
+ assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups));
+ assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
+ }
+
+ private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
+ final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
+
+ Set<TaskId> ids = new HashSet<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
+
+ if (stateChangelogTopics.contains(changelogTopic)) {
+ for (TaskId id : tasks) {
+ if (id.topicGroupId == entry.getKey())
+ ids.add(id);
+ }
+ }
+ }
+ return ids;
}
@Test
@@ -406,48 +435,6 @@ public class StreamPartitionAssignorTest {
assertEquals(allTasks, allStandbyTasks);
}
- private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
-
- // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
-
- AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-
- // check if the number of assigned partitions == the size of active task id list
- assertEquals(assignment.partitions().size(), info.activeTasks.size());
-
- // check if active tasks are consistent
- List<TaskId> activeTasks = new ArrayList<>();
- Set<String> activeTopics = new HashSet<>();
- for (TopicPartition partition : assignment.partitions()) {
- // since default grouper, taskid.partition == partition.partition()
- activeTasks.add(new TaskId(0, partition.partition()));
- activeTopics.add(partition.topic());
- }
- assertEquals(activeTasks, info.activeTasks);
-
- // check if active partitions cover all topics
- assertEquals(allTopics, activeTopics);
-
- // check if standby tasks are consistent
- Set<String> standbyTopics = new HashSet<>();
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
- TaskId id = entry.getKey();
- Set<TopicPartition> partitions = entry.getValue();
- for (TopicPartition partition : partitions) {
- // since default grouper, taskid.partition == partition.partition()
- assertEquals(id.partition, partition.partition());
-
- standbyTopics.add(partition.topic());
- }
- }
-
- if (info.standbyTasks.size() > 0)
- // check if standby partitions cover all topics
- assertEquals(allTopics, standbyTopics);
-
- return info;
- }
-
@Test
public void testOnAssignment() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
@@ -468,16 +455,18 @@ public class StreamPartitionAssignorTest {
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+ Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
- standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
- standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+ activeTasks.put(task0, Utils.mkSet(t1p0));
+ activeTasks.put(task3, Utils.mkSet(t2p3));
+ standbyTasks.put(task1, Utils.mkSet(t1p0));
+ standbyTasks.put(task2, Utils.mkSet(t2p0));
AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
partitionAssignor.onAssignment(assignment);
- assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
- assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
+ assertEquals(activeTasks, partitionAssignor.activeTasks());
assertEquals(standbyTasks, partitionAssignor.standbyTasks());
}
@@ -621,7 +610,7 @@ public class StreamPartitionAssignorTest {
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
- final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHostState.get(new HostInfo("localhost", 8080));
+ final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic1", 2)), topicPartitions);
@@ -725,13 +714,54 @@ public class StreamPartitionAssignorTest {
}
+ private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+ // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+ AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+ // check if the number of assigned partitions == the size of active task id list
+ assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+ // check if active tasks are consistent
+ List<TaskId> activeTasks = new ArrayList<>();
+ Set<String> activeTopics = new HashSet<>();
+ for (TopicPartition partition : assignment.partitions()) {
+ // since default grouper, taskid.partition == partition.partition()
+ activeTasks.add(new TaskId(0, partition.partition()));
+ activeTopics.add(partition.topic());
+ }
+ assertEquals(activeTasks, info.activeTasks);
+
+ // check if active partitions cover all topics
+ assertEquals(allTopics, activeTopics);
+
+ // check if standby tasks are consistent
+ Set<String> standbyTopics = new HashSet<>();
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+ TaskId id = entry.getKey();
+ Set<TopicPartition> partitions = entry.getValue();
+ for (TopicPartition partition : partitions) {
+ // since default grouper, taskid.partition == partition.partition()
+ assertEquals(id.partition, partition.partition());
+
+ standbyTopics.add(partition.topic());
+ }
+ }
+
+ if (info.standbyTasks.size() > 0)
+ // check if standby partitions cover all topics
+ assertEquals(allTopics, standbyTopics);
+
+ return info;
+ }
private class MockInternalTopicManager extends InternalTopicManager {
- public Map<String, Integer> readyTopics = new HashMap<>();
- public MockConsumer<byte[], byte[]> restoreConsumer;
+ Map<String, Integer> readyTopics = new HashMap<>();
+ MockConsumer<byte[], byte[]> restoreConsumer;
- public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
+ MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) {
super();
this.restoreConsumer = restoreConsumer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 411e02d..f4772f0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -109,7 +109,7 @@ public class StreamsMetadataStateTest {
new PartitionInfo("topic-three", 0, null, null, null),
new PartitionInfo("topic-four", 0, null, null, null));
- cluster = new Cluster(Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet());
+ cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
discovery = new StreamsMetadataState(builder);
discovery.onChange(hostToPartitions, cluster);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ce94a23..cfa0e61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -64,7 +64,7 @@ public class AssignmentInfoTest {
final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
assertEquals(oldVersion.activeTasks, decoded.activeTasks);
assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
- assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
+ assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
index 4333087..52ca0a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java
@@ -33,23 +33,34 @@ import static org.junit.Assert.assertTrue;
public class TaskAssignorTest {
+ private static Map<Integer, ClientState<Integer>> copyStates(Map<Integer, ClientState<Integer>> states) {
+ Map<Integer, ClientState<Integer>> copy = new HashMap<>();
+ for (Map.Entry<Integer, ClientState<Integer>> entry : states.entrySet()) {
+ copy.put(entry.getKey(), entry.getValue().copy());
+ }
+
+ return copy;
+ }
+
@Test
public void testAssignWithoutStandby() {
- HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+ HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
for (int i = 0; i < 6; i++) {
- states.put(i, new ClientState<Integer>(1d));
+ statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
}
Set<Integer> tasks;
- Map<Integer, ClientState<Integer>> assignments;
int numActiveTasks;
int numAssignedTasks;
+ Map<Integer, ClientState<Integer>> states;
+
// # of clients and # of tasks are equal.
+ states = copyStates(statesWithNoPrevTasks);
tasks = mkSet(0, 1, 2, 3, 4, 5);
- assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+ TaskAssignor.assign(states, tasks, 0);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertEquals(1, assignment.activeTasks.size());
@@ -60,10 +71,11 @@ public class TaskAssignorTest {
// # of clients < # of tasks
tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
- assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 0);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(1 <= assignment.activeTasks.size());
@@ -76,10 +88,11 @@ public class TaskAssignorTest {
// # of clients > # of tasks
tasks = mkSet(0, 1, 2, 3);
- assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 0);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -93,12 +106,12 @@ public class TaskAssignorTest {
@Test
public void testAssignWithStandby() {
- HashMap<Integer, ClientState<Integer>> states = new HashMap<>();
+ HashMap<Integer, ClientState<Integer>> statesWithNoPrevTasks = new HashMap<>();
for (int i = 0; i < 6; i++) {
- states.put(i, new ClientState<Integer>(1d));
+ statesWithNoPrevTasks.put(i, new ClientState<Integer>(1d));
}
Set<Integer> tasks;
- Map<Integer, ClientState<Integer>> assignments;
+ Map<Integer, ClientState<Integer>> states;
int numActiveTasks;
int numAssignedTasks;
@@ -108,8 +121,9 @@ public class TaskAssignorTest {
// 1 standby replicas.
numActiveTasks = 0;
numAssignedTasks = 0;
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
- for (ClientState<Integer> assignment : assignments.values()) {
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertEquals(1, assignment.activeTasks.size());
@@ -122,10 +136,11 @@ public class TaskAssignorTest {
tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7);
// 1 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(1 <= assignment.activeTasks.size());
@@ -140,10 +155,11 @@ public class TaskAssignorTest {
tasks = mkSet(0, 1, 2, 3);
// 1 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -158,10 +174,11 @@ public class TaskAssignorTest {
tasks = mkSet(0, 1);
// 1 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 1);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -173,10 +190,11 @@ public class TaskAssignorTest {
assertEquals(tasks.size() * 2, numAssignedTasks);
// 2 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 2, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 2);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -187,10 +205,11 @@ public class TaskAssignorTest {
assertEquals(tasks.size() * 3, numAssignedTasks);
// 3 standby replicas.
- assignments = TaskAssignor.assign(states, tasks, 3, "TaskAssignorTest-TestAssignWithStandby");
+ states = copyStates(statesWithNoPrevTasks);
+ TaskAssignor.assign(states, tasks, 3);
numActiveTasks = 0;
numAssignedTasks = 0;
- for (ClientState<Integer> assignment : assignments.values()) {
+ for (ClientState<Integer> assignment : states.values()) {
numActiveTasks += assignment.activeTasks.size();
numAssignedTasks += assignment.assignedTasks.size();
assertTrue(0 <= assignment.activeTasks.size());
@@ -205,27 +224,29 @@ public class TaskAssignorTest {
@Test
public void testStickiness() {
List<Integer> tasks;
- Map<Integer, ClientState<Integer>> states;
+ Map<Integer, ClientState<Integer>> statesWithPrevTasks;
Map<Integer, ClientState<Integer>> assignments;
int i;
// # of clients and # of tasks are equal.
+ Map<Integer, ClientState<Integer>> states;
tasks = mkList(0, 1, 2, 3, 4, 5);
Collections.shuffle(tasks);
- states = new HashMap<>();
+ statesWithPrevTasks = new HashMap<>();
i = 0;
for (int task : tasks) {
ClientState<Integer> state = new ClientState<>(1d);
state.prevActiveTasks.add(task);
state.prevAssignedTasks.add(task);
- states.put(i++, state);
+ statesWithPrevTasks.put(i++, state);
}
- assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0, "TaskAssignorTest-TestStickiness");
+ states = copyStates(statesWithPrevTasks);
+ TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0);
for (int client : states.keySet()) {
- Set<Integer> oldActive = states.get(client).prevActiveTasks;
- Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
- Set<Integer> newActive = assignments.get(client).activeTasks;
- Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+ Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+ Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+ Set<Integer> newActive = states.get(client).activeTasks;
+ Set<Integer> newAssigned = states.get(client).assignedTasks;
assertEquals(oldActive, newActive);
assertEquals(oldAssigned, newAssigned);
@@ -234,7 +255,7 @@ public class TaskAssignorTest {
// # of clients > # of tasks
tasks = mkList(0, 1, 2, 3, -1, -1);
Collections.shuffle(tasks);
- states = new HashMap<>();
+ statesWithPrevTasks = new HashMap<>();
i = 0;
for (int task : tasks) {
ClientState<Integer> state = new ClientState<>(1d);
@@ -242,14 +263,15 @@ public class TaskAssignorTest {
state.prevActiveTasks.add(task);
state.prevAssignedTasks.add(task);
}
- states.put(i++, state);
+ statesWithPrevTasks.put(i++, state);
}
- assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0, "TaskAssignorTest-TestStickiness");
+ states = copyStates(statesWithPrevTasks);
+ TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0);
for (int client : states.keySet()) {
- Set<Integer> oldActive = states.get(client).prevActiveTasks;
- Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
- Set<Integer> newActive = assignments.get(client).activeTasks;
- Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+ Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+ Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+ Set<Integer> newActive = states.get(client).activeTasks;
+ Set<Integer> newAssigned = states.get(client).assignedTasks;
assertEquals(oldActive, newActive);
assertEquals(oldAssigned, newAssigned);
@@ -258,20 +280,21 @@ public class TaskAssignorTest {
// # of clients < # of tasks
List<Set<Integer>> taskSets = mkList(mkSet(0, 1), mkSet(2, 3), mkSet(4, 5), mkSet(6, 7), mkSet(8, 9), mkSet(10, 11));
Collections.shuffle(taskSets);
- states = new HashMap<>();
+ statesWithPrevTasks = new HashMap<>();
i = 0;
for (Set<Integer> taskSet : taskSets) {
ClientState<Integer> state = new ClientState<>(1d);
state.prevActiveTasks.addAll(taskSet);
state.prevAssignedTasks.addAll(taskSet);
- states.put(i++, state);
+ statesWithPrevTasks.put(i++, state);
}
- assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0, "TaskAssignorTest-TestStickiness");
+ states = copyStates(statesWithPrevTasks);
+ TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0);
for (int client : states.keySet()) {
- Set<Integer> oldActive = states.get(client).prevActiveTasks;
- Set<Integer> oldAssigned = states.get(client).prevAssignedTasks;
- Set<Integer> newActive = assignments.get(client).activeTasks;
- Set<Integer> newAssigned = assignments.get(client).assignedTasks;
+ Set<Integer> oldActive = statesWithPrevTasks.get(client).prevActiveTasks;
+ Set<Integer> oldAssigned = statesWithPrevTasks.get(client).prevAssignedTasks;
+ Set<Integer> newActive = states.get(client).activeTasks;
+ Set<Integer> newAssigned = states.get(client).assignedTasks;
Set<Integer> intersection = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 7675f9b..4e8a497 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -66,8 +66,10 @@ public class StoreChangeLoggerTest {
}
};
+ @SuppressWarnings("unchecked")
@Test
- public void testAddRemove() {
+ public void testAddRemove() throws Exception {
+
context.setTime(1);
written.put(0, "zero");
changeLogger.add(0);
@@ -75,26 +77,27 @@ public class StoreChangeLoggerTest {
changeLogger.add(1);
written.put(2, "two");
changeLogger.add(2);
- assertEquals(3, changeLogger.numDirty());
- assertEquals(0, changeLogger.numRemoved());
+
+ assertEquals(3, changeLogger.dirty.size());
+ assertEquals(0, changeLogger.removed.size());
changeLogger.delete(0);
changeLogger.delete(1);
written.put(3, "three");
changeLogger.add(3);
- assertEquals(2, changeLogger.numDirty());
- assertEquals(2, changeLogger.numRemoved());
+ assertEquals(2, changeLogger.dirty.size());
+ assertEquals(2, changeLogger.removed.size());
written.put(0, "zero-again");
changeLogger.add(0);
- assertEquals(3, changeLogger.numDirty());
- assertEquals(1, changeLogger.numRemoved());
+ assertEquals(3, changeLogger.dirty.size());
+ assertEquals(1, changeLogger.removed.size());
written.put(4, "four");
changeLogger.add(4);
changeLogger.maybeLogChange(getter);
- assertEquals(0, changeLogger.numDirty());
- assertEquals(0, changeLogger.numRemoved());
+ assertEquals(0, changeLogger.dirty.size());
+ assertEquals(0, changeLogger.removed.size());
assertEquals(5, logged.size());
assertEquals("zero-again", logged.get(0));
assertEquals(null, logged.get(1));
[2/2] kafka git commit: KAFKA-4117: Stream partitionassignro cleanup
Posted by gu...@apache.org.
KAFKA-4117: Stream partitionassignro cleanup
1. Create a new `ClientMetadata` to collapse `Set<String> consumerMemberIds`, `ClientState<TaskId> state`, and `HostInfo hostInfo`.
2. Stop reusing `stateChangelogTopicToTaskIds` and `internalSourceTopicToTaskIds` to access the (sub-)topology's internal repartition and changelog topics for clarity; also use the source topics num.partitions to set the num.partitions for repartition topics, and clarify to NOT have cycles since otherwise the while loop will fail.
3. `ensure-copartition` at the end to modify the number of partitions for repartition topics if necessary to be equal to other co-partition topics.
4. Refactor `ClientState` as well and update the logic of `TaskAssignor` for clarity as well.
5. Change default `clientId` from `applicationId-suffix` to `applicationId-processId` where `processId` is an UUID to avoid conflicts of clientIds that are from different JVMs, and hence conflicts in metrics.
6. Enforce `assignment` partitions to have the same size, and hence 1-1 mapping to `activeTask` taskIds.
7. Remove the `AssignmentSupplier` class by always construct the `partitionsByHostState` before assigning tasks to consumers within a client.
8. Remove all unnecessary member variables in `StreamPartitionAssignor`.
9. Some other minor fixes on unit tests, e.g. remove `test only` functions with java field reflection.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Xavier L�aut�, Matthias J. Sax, Eno Thereska, Jason Gustafson
Closes #2012 from guozhangwang/K4117-stream-partitionassignro-cleanup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4ab9d02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4ab9d02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4ab9d02
Branch: refs/heads/trunk
Commit: a4ab9d02a22e77f0ebca0450e608898d83d4fe18
Parents: 0dd9607
Author: Guozhang Wang <wa...@gmail.com>
Authored: Sat Oct 29 10:48:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Oct 29 10:48:17 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/common/Cluster.java | 20 +-
.../org/apache/kafka/streams/KafkaStreams.java | 4 +-
.../streams/processor/TopologyBuilder.java | 8 +-
.../internals/InternalTopicConfig.java | 3 +-
.../streams/processor/internals/SinkNode.java | 5 -
.../streams/processor/internals/SourceNode.java | 5 -
.../internals/StreamPartitionAssignor.java | 717 ++++++++++---------
.../processor/internals/StreamThread.java | 39 +-
.../internals/assignment/AssignmentInfo.java | 14 +-
.../internals/assignment/ClientState.java | 16 +-
.../internals/assignment/TaskAssignor.java | 37 +-
.../apache/kafka/streams/state/HostInfo.java | 2 +-
.../state/internals/StoreChangeLogger.java | 10 -
.../integration/FanoutIntegrationTest.java | 1 -
.../kstream/internals/KStreamImplTest.java | 2 +-
.../kstream/internals/KTableImplTest.java | 16 +-
.../streams/processor/TopologyBuilderTest.java | 4 +-
.../internals/StreamPartitionAssignorTest.java | 148 ++--
.../internals/StreamsMetadataStateTest.java | 2 +-
.../assignment/AssignmentInfoTest.java | 2 +-
.../internals/assignment/TaskAssignorTest.java | 115 +--
.../state/internals/StoreChangeLoggerTest.java | 21 +-
22 files changed, 642 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 471ae26..3c6475d 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -207,6 +207,16 @@ public final class Cluster {
}
/**
+ * Get the number of partitions for the given topic
+ * @param topic The topic to get the number of partitions for
+ * @return The number of partitions or null if there is no corresponding metadata
+ */
+ public Integer partitionCountForTopic(String topic) {
+ List<PartitionInfo> partitions = this.partitionsByTopic.get(topic);
+ return partitions == null ? null : partitions.size();
+ }
+
+ /**
* Get the list of available partitions for this topic
* @param topic The topic name
* @return A list of partitions
@@ -225,16 +235,6 @@ public final class Cluster {
}
/**
- * Get the number of partitions for the given topic
- * @param topic The topic to get the number of partitions for
- * @return The number of partitions or null if there is no corresponding metadata
- */
- public Integer partitionCountForTopic(String topic) {
- List<PartitionInfo> partitionInfos = this.partitionsByTopic.get(topic);
- return partitionInfos == null ? null : partitionInfos.size();
- }
-
- /**
* Get all topics.
* @return a set of all topics
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2333db7..e120b31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -47,7 +47,6 @@ import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
@@ -91,7 +90,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class KafkaStreams {
private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
- private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.streams";
// container states
@@ -156,7 +154,7 @@ public class KafkaStreams {
String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
- clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+ clientId = applicationId + "-" + processId;
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f1f63..ecac8c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -227,14 +227,14 @@ public class TopologyBuilder {
public static class TopicsInfo {
public Set<String> sinkTopics;
public Set<String> sourceTopics;
- public Map<String, InternalTopicConfig> interSourceTopics;
public Map<String, InternalTopicConfig> stateChangelogTopics;
+ public Map<String, InternalTopicConfig> repartitionSourceTopics;
- public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> interSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
+ public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
this.sinkTopics = sinkTopics;
this.sourceTopics = sourceTopics;
- this.interSourceTopics = interSourceTopics;
this.stateChangelogTopics = stateChangelogTopics;
+ this.repartitionSourceTopics = repartitionSourceTopics;
}
@Override
@@ -258,7 +258,7 @@ public class TopologyBuilder {
return "TopicsInfo{" +
"sinkTopics=" + sinkTopics +
", sourceTopics=" + sourceTopics +
- ", interSourceTopics=" + interSourceTopics +
+ ", repartitionSourceTopics=" + repartitionSourceTopics +
", stateChangelogTopics=" + stateChangelogTopics +
'}';
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index 45016c8..c23f134 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -30,9 +30,10 @@ public class InternalTopicConfig {
private final String name;
private final Map<String, String> logConfig;
- private Long retentionMs;
private final Set<CleanupPolicy> cleanupPolicies;
+ private Long retentionMs;
+
public InternalTopicConfig(final String name, final Set<CleanupPolicy> defaultCleanupPolicies, final Map<String, String> logConfig) {
Objects.requireNonNull(name, "name can't be null");
if (defaultCleanupPolicies.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2b5692d..4e56f61 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -77,11 +77,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
// do nothing
}
- // for test only
- public Serializer<V> valueSerializer() {
- return valSerializer;
- }
-
/**
* @return a string representation of this node, useful for debugging.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 4bc3a53..e17509b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -71,11 +71,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
// do nothing
}
- // for test only
- public Deserializer<V> valueDeserializer() {
- return valDeserializer;
- }
-
/**
* @return a string representation of this node, useful for debugging.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index dcba543..009ba1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -46,37 +46,92 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
- private String userEndPointConfig;
- private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
- private Cluster metadataWithInternalTopics;
-
private static class AssignedPartition implements Comparable<AssignedPartition> {
public final TaskId taskId;
public final TopicPartition partition;
- public AssignedPartition(TaskId taskId, TopicPartition partition) {
+ AssignedPartition(final TaskId taskId, final TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
}
@Override
- public int compareTo(AssignedPartition that) {
+ public int compareTo(final AssignedPartition that) {
return PARTITION_COMPARATOR.compare(this.partition, that.partition);
}
}
+ private static class ClientMetadata {
+ final HostInfo hostInfo;
+ final Set<String> consumers;
+ final ClientState<TaskId> state;
+
+ ClientMetadata(final String endPoint) {
+
+ // get the host info if possible
+ if (endPoint != null) {
+ final String host = getHost(endPoint);
+ final Integer port = getPort(endPoint);
+
+ if (host == null || port == null)
+ throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
+
+ hostInfo = new HostInfo(host, port);
+ } else {
+ hostInfo = null;
+ }
+
+ // initialize the consumer memberIds
+ consumers = new HashSet<>();
+
+ // initialize the client state
+ state = new ClientState<>();
+ }
+
+ void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
+
+ consumers.add(consumerMemberId);
+
+ state.prevActiveTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.standbyTasks);
+ state.capacity = state.capacity + 1d;
+ }
+
+ @Override
+ public String toString() {
+ return "ClientMetadata{" +
+ "hostInfo=" + hostInfo +
+ ", consumers=" + consumers +
+ ", state=" + state +
+ '}';
+ }
+ }
+
+ private static class InternalTopicMetadata {
+ public final InternalTopicConfig config;
+
+ public int numPartitions;
+
+ InternalTopicMetadata(final InternalTopicConfig config) {
+ this.config = config;
+ this.numPartitions = -1;
+ }
+ }
+
private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
@Override
public int compare(TopicPartition p1, TopicPartition p2) {
@@ -92,12 +147,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private StreamThread streamThread;
+ private String userEndPoint;
private int numStandbyReplicas;
- private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
- private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
- private Map<InternalTopicConfig, Set<TaskId>> stateChangelogTopicToTaskIds;
- private Map<InternalTopicConfig, Set<TaskId>> internalSourceTopicToTaskIds;
+
+ private Cluster metadataWithInternalTopics;
+ private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+
private Map<TaskId, Set<TopicPartition>> standbyTasks;
+ private Map<TaskId, Set<TopicPartition>> activeTasks;
private InternalTopicManager internalTopicManager;
@@ -111,7 +168,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -130,21 +186,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
- final String[] hostPort = userEndPoint.split(":");
- if (hostPort.length != 2) {
- throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
- " but received %s",
- streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
- } else {
- try {
- Integer.valueOf(hostPort[1]);
- this.userEndPointConfig = userEndPoint;
- } catch (NumberFormatException nfe) {
- throw new ConfigException(String.format("stream-thread [%s] Invalid port %s supplied in %s for config %s",
- streamThread.getName(), hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
- }
+ try {
+ String host = getHost(userEndPoint);
+ Integer port = getPort(userEndPoint);
+
+ if (host == null || port == null)
+ throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
+ " but received %s",
+ streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+ } catch (NumberFormatException nfe) {
+ throw new ConfigException(String.format("stream-thread [%s] Invalid port supplied in %s for config %s",
+ streamThread.getName(), userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
}
+ this.userEndPoint = userEndPoint;
}
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
@@ -174,7 +229,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
+ SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPoint);
if (streamThread.builder.sourceTopicPattern() != null) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -187,238 +242,252 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
return new Subscription(new ArrayList<>(topics), data.encode());
}
- /**
- * Internal helper function that creates a Kafka topic
- * @param topicToTaskIds Map that contains the topic names to be created
- * @param postPartitionPhase If true, the computation for calculating the number of partitions
- * is slightly different. Set to true after the initial topic-to-partition
- * assignment.
- * @return
+ /*
+ * This assigns tasks to consumer clients in the following steps.
+ *
+ * 0. check all repartition source topics and use internal topic manager to make sure
+ * they have been created with the right number of partitions.
+ *
+ * 1. using user customized partition grouper to generate tasks along with their
+ * assigned partitions; also make sure that the task's corresponding changelog topics
+ * have been created with the right number of partitions.
+ *
+ * 2. using TaskAssignor to assign tasks to consumer clients.
+ * - Assign a task to a client which was running it previously.
+ * If there is no such client, assign a task to a client which has its valid local state.
+ * - A client may have more than one stream threads.
+ * The assignor tries to assign tasks to a client proportionally to the number of threads.
+ * - We try not to assign the same set of tasks to two different clients
+ * We do the assignment in one-pass. The result may not satisfy above all.
+ *
+ * 3. within each client, tasks are assigned to consumer clients in round-robin manner.
*/
- private Map<TopicPartition, PartitionInfo> prepareTopic(Map<InternalTopicConfig, Set<TaskId>> topicToTaskIds,
- boolean postPartitionPhase) {
- Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>();
- // if ZK is specified, prepare the internal source topic before calling partition grouper
- if (internalTopicManager != null) {
- log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
-
- for (Map.Entry<InternalTopicConfig, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
- InternalTopicConfig topic = entry.getKey();
- int numPartitions = 0;
- if (postPartitionPhase) {
- // the expected number of partitions is the max value of TaskId.partition + 1
- for (TaskId task : entry.getValue()) {
- if (numPartitions < task.partition + 1)
- numPartitions = task.partition + 1;
- }
- } else {
- // should have size 1 only
- numPartitions = -1;
- for (TaskId task : entry.getValue()) {
- numPartitions = task.partition;
- }
- }
-
- internalTopicManager.makeReady(topic, numPartitions);
-
- // wait until the topic metadata has been propagated to all brokers
- List<PartitionInfo> partitions;
- do {
- partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
- } while (partitions == null || partitions.size() != numPartitions);
-
- for (PartitionInfo partition : partitions)
- partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition);
- }
-
- log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
- } else {
- List<String> missingTopics = new ArrayList<>();
- for (InternalTopicConfig topic : topicToTaskIds.keySet()) {
- List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
- if (partitions == null) {
- missingTopics.add(topic.name());
- }
- }
- if (!missingTopics.isEmpty()) {
- log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
- streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
-
- }
- }
-
- return partitionInfos;
- }
-
@Override
public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- // This assigns tasks to consumer clients in two steps.
- // 1. using TaskAssignor to assign tasks to consumer clients.
- // - Assign a task to a client which was running it previously.
- // If there is no such client, assign a task to a client which has its valid local state.
- // - A client may have more than one stream threads.
- // The assignor tries to assign tasks to a client proportionally to the number of threads.
- // - We try not to assign the same set of tasks to two different clients
- // We do the assignment in one-pass. The result may not satisfy above all.
- // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
- Map<UUID, Set<String>> consumersByClient = new HashMap<>();
- Map<UUID, ClientState<TaskId>> states = new HashMap<>();
- Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
- // decode subscription info
+
+ // construct the client metadata from the decoded subscription info
+ Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
-
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
- if (info.userEndPoint != null) {
- final String[] hostPort = info.userEndPoint.split(":");
- consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
- }
- Set<String> consumers = consumersByClient.get(info.processId);
- if (consumers == null) {
- consumers = new HashSet<>();
- consumersByClient.put(info.processId, consumers);
- }
- consumers.add(consumerId);
- ClientState<TaskId> state = states.get(info.processId);
- if (state == null) {
- state = new ClientState<>();
- states.put(info.processId, state);
+ // create the new client metadata if necessary
+ ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
+
+ if (clientMetadata == null) {
+ clientMetadata = new ClientMetadata(info.userEndPoint);
+ clientsMetadata.put(info.processId, clientMetadata);
}
- state.prevActiveTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.standbyTasks);
- state.capacity = state.capacity + 1d;
+ // add the consumer to the client
+ clientMetadata.addConsumer(consumerId, info);
}
+ log.info("stream-thread [{}] Constructed client metadata {} from the member subscriptions.", streamThread.getName(), clientsMetadata);
- this.topicGroups = streamThread.builder.topicGroups();
+ // ---------------- Step Zero ---------------- //
- // ensure the co-partitioning topics within the group have the same number of partitions,
- // and enforce the number of partitions for those internal topics.
- internalSourceTopicToTaskIds = new HashMap<>();
- Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
- Map<Integer, Collection<InternalTopicConfig>> internalSourceTopicGroups = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
- sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
- internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics.values());
+ // parse the topology to determine the repartition source topics,
+ // making sure they are created with the number of partitions as
+ // the maximum of the depending sub-topologies source topics' number of partitions
+ Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+
+ Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
+ for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
+ repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
+ }
}
+ boolean numPartitionsNeeded;
+ do {
+ numPartitionsNeeded = false;
+
+ for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+ int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
+
+ // try set the number of partitions for this repartition topic if it is not set yet
+ if (numPartitions == -1) {
+ for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+ Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
+
+ if (otherSinkTopics.contains(topicName)) {
+ // if this topic is one of the sink topics of this topology,
+ // use the maximum of all its source topic partitions as the number of partitions
+ for (String sourceTopicName : otherTopicsInfo.sourceTopics) {
+ Integer numPartitionsCandidate;
+ // It is possible the sourceTopic is another internal topic, i.e,
+ // map().join().join(map())
+ if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
+ numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions;
+ } else {
+ numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
+ }
- // for all internal source topics
- // set the number of partitions to the maximum of the depending sub-topologies source topics
- Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
- Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
- Map<String, InternalTopicConfig> internalTopics = entry.getValue().interSourceTopics;
- allInternalTopics.putAll(internalTopics);
- for (InternalTopicConfig internalTopic : internalTopics.values()) {
- Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
-
- if (tasks == null) {
- int numPartitions = -1;
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : topicGroups.entrySet()) {
- Set<String> otherSinkTopics = other.getValue().sinkTopics;
-
- if (otherSinkTopics.contains(internalTopic.name())) {
- for (String topic : other.getValue().sourceTopics) {
- Integer partitions = null;
- // It is possible the sourceTopic is another internal topic, i.e,
- // map().join().join(map())
- if (allInternalTopics.containsKey(topic)) {
- Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(allInternalTopics.get(topic));
- if (taskIds != null) {
- for (TaskId taskId : taskIds) {
- partitions = taskId.partition;
- }
+ if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) {
+ numPartitions = numPartitionsCandidate;
}
- } else {
- partitions = metadata.partitionCountForTopic(topic);
- }
- if (partitions != null && partitions > numPartitions) {
- numPartitions = partitions;
}
}
}
- }
- internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
- for (int partition = 0; partition < numPartitions; partition++) {
- internalPartitionInfos.put(new TopicPartition(internalTopic.name(), partition),
- new PartitionInfo(internalTopic.name(), partition, null, new Node[0], new Node[0]));
+
+ // if we still have not find the right number of partitions,
+ // another iteration is needed
+ if (numPartitions == -1)
+ numPartitionsNeeded = true;
+ else
+ repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
}
}
}
+ } while (numPartitionsNeeded);
+
+ // augment the metadata with the newly computed number of partitions for all the
+ // repartition source topics
+ Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
+ for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
+ String topic = entry.getKey();
+ Integer numPartitions = entry.getValue().numPartitions;
+
+ for (int partition = 0; partition < numPartitions; partition++) {
+ allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
+ new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
+ }
}
+ // ensure the co-partitioning topics within the group have the same number of partitions,
+ // and enforce the number of partitions for those repartition topics to be the same if they
+ // are co-partitioned as well.
+ ensureCopartitioning(streamThread.builder.copartitionGroups(), repartitionTopicMetadata, metadata);
- Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
- ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups,
- metadata.withPartitions(internalPartitionInfos));
-
-
- internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false);
- internalSourceTopicToTaskIds.clear();
+ // make sure the repartition source topics exist with the right number of partitions,
+ // create these topics if necessary
+ prepareTopic(repartitionTopicMetadata);
metadataWithInternalTopics = metadata;
if (internalTopicManager != null)
- metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
+ metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
+
+ log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.", streamThread.getName(), allRepartitionTopicPartitions.values());
+
+ // ---------------- Step One ---------------- //
// get the tasks as partition groups from the partition grouper
- Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
- sourceTopicGroups, metadataWithInternalTopics);
+ Set<String> allSourceTopics = new HashSet<>();
+ Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ allSourceTopics.addAll(entry.getValue().sourceTopics);
+ sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
+ }
- // add tasks to state change log topic subscribers
- stateChangelogTopicToTaskIds = new HashMap<>();
- for (TaskId task : partitionsForTask.keySet()) {
- final Map<String, InternalTopicConfig> stateChangelogTopics = topicGroups.get(task.topicGroupId).stateChangelogTopics;
- for (InternalTopicConfig topic : stateChangelogTopics.values()) {
- Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(topic);
- if (tasks == null) {
- tasks = new HashSet<>();
- stateChangelogTopicToTaskIds.put(topic, tasks);
+ Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(
+ sourceTopicsByGroup, metadataWithInternalTopics);
+
+ // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
+ Set<TopicPartition> allAssignedPartitions = new HashSet<>();
+ Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+ Set<TopicPartition> partitions = entry.getValue();
+ for (TopicPartition partition : partitions) {
+ if (allAssignedPartitions.contains(partition)) {
+ log.warn("stream-thread [{}] Partition {} is assigned to more than one tasks: {}", streamThread.getName(), partition, partitionsForTask);
}
+ }
+ allAssignedPartitions.addAll(partitions);
- tasks.add(task);
+ TaskId id = entry.getKey();
+ Set<TaskId> ids = tasksByTopicGroup.get(id.topicGroupId);
+ if (ids == null) {
+ ids = new HashSet<>();
+ tasksByTopicGroup.put(id.topicGroupId, ids);
+ }
+ ids.add(id);
+ }
+ for (String topic : allSourceTopics) {
+ for (PartitionInfo partitionInfo : metadataWithInternalTopics.partitionsForTopic(topic)) {
+ TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+ if (!allAssignedPartitions.contains(partition)) {
+ log.warn("stream-thread [{}] Partition {} is not assigned to any tasks: {}", streamThread.getName(), partition, partitionsForTask);
+ }
}
+ }
- final Map<String, InternalTopicConfig> interSourceTopics = topicGroups.get(task.topicGroupId).interSourceTopics;
- for (InternalTopicConfig topic : interSourceTopics.values()) {
- Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topic);
- if (tasks == null) {
- tasks = new HashSet<>();
- internalSourceTopicToTaskIds.put(topic, tasks);
+ // add tasks to state change log topic subscribers
+ Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ final int topicGroupId = entry.getKey();
+ final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
+
+ for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
+ // the expected number of partitions is the max value of TaskId.partition + 1
+ int numPartitions = -1;
+ for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+ if (numPartitions < task.partition + 1)
+ numPartitions = task.partition + 1;
}
- tasks.add(task);
+ InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
+ topicMetadata.numPartitions = numPartitions;
+
+ changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
}
}
+ prepareTopic(changelogTopicMetadata);
+
+ log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata);
+
+ // ---------------- Step Two ---------------- //
+
// assign tasks to clients
- states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas, streamThread.getName());
+ Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+ for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ states.put(entry.getKey(), entry.getValue().state);
+ }
+
+ log.debug("stream-thread [{}] Assigning tasks {} to clients {} with number of replicas {}",
+ streamThread.getName(), partitionsForTask.keySet(), states, numStandbyReplicas);
+
+ TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+
+ log.info("stream-thread [{}] Assigned tasks to clients as {}.", streamThread.getName(), states);
- final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
+ // ---------------- Step Three ---------------- //
+
+ // construct the global partition assignment per host map
+ partitionsByHostState = new HashMap<>();
+ for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ HostInfo hostInfo = entry.getValue().hostInfo;
+
+ if (hostInfo != null) {
+ final Set<TopicPartition> topicPartitions = new HashSet<>();
+ final ClientState<TaskId> state = entry.getValue().state;
+
+ for (TaskId id : state.assignedTasks) {
+ topicPartitions.addAll(partitionsForTask.get(id));
+ }
+
+ partitionsByHostState.put(hostInfo, topicPartitions);
+ }
+ }
- final Map<HostInfo, Set<TopicPartition>> endPointMap = new HashMap<>();
- for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
- UUID processId = entry.getKey();
- Set<String> consumers = entry.getValue();
- ClientState<TaskId> state = states.get(processId);
+ // within the client, distribute tasks to its owned consumers
+ Map<String, Assignment> assignment = new HashMap<>();
+ for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ Set<String> consumers = entry.getValue().consumers;
+ ClientState<TaskId> state = entry.getValue().state;
ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
final int numActiveTasks = state.activeTasks.size();
- for (TaskId taskId : state.activeTasks) {
- taskIds.add(taskId);
- }
- for (TaskId id : state.assignedTasks) {
- if (!state.activeTasks.contains(id))
- taskIds.add(id);
- }
- final int numConsumers = consumers.size();
+ taskIds.addAll(state.activeTasks);
+ taskIds.addAll(state.standbyTasks);
+ final int numConsumers = consumers.size();
int i = 0;
for (String consumer : consumers) {
@@ -448,100 +517,58 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
for (AssignedPartition partition : assignedPartitions) {
active.add(partition.taskId);
activePartitions.add(partition.partition);
- HostInfo hostInfo = consumerEndPointMap.get(processId);
- if (hostInfo != null) {
- if (!endPointMap.containsKey(hostInfo)) {
- endPointMap.put(hostInfo, new HashSet<TopicPartition>());
- }
- final Set<TopicPartition> topicPartitions = endPointMap.get(hostInfo);
- topicPartitions.add(partition.partition);
- }
}
-
- assignmentSuppliers.add(new AssignmentSupplier(consumer,
- active,
- standby,
- endPointMap,
- activePartitions));
+ // finally, encode the assignment before sending back to coordinator
+ assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
i++;
}
}
- // if ZK is specified, validate the internal topics again
- prepareTopic(internalSourceTopicToTaskIds, /* compactTopic */ true);
- // change log topics should be compacted
- prepareTopic(stateChangelogTopicToTaskIds, /* compactTopic */ true);
-
- Map<String, Assignment> assignment = new HashMap<>();
- for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
- assignment.put(assignmentSupplier.consumer, assignmentSupplier.get());
- }
return assignment;
}
- class AssignmentSupplier {
- private final String consumer;
- private final List<TaskId> active;
- private final Map<TaskId, Set<TopicPartition>> standby;
- private final Map<HostInfo, Set<TopicPartition>> endPointMap;
- private final List<TopicPartition> activePartitions;
-
- AssignmentSupplier(final String consumer,
- final List<TaskId> active,
- final Map<TaskId, Set<TopicPartition>> standby,
- final Map<HostInfo, Set<TopicPartition>> endPointMap,
- final List<TopicPartition> activePartitions) {
- this.consumer = consumer;
- this.active = active;
- this.standby = standby;
- this.endPointMap = endPointMap;
- this.activePartitions = activePartitions;
- }
-
- Assignment get() {
- return new Assignment(activePartitions, new AssignmentInfo(active,
- standby,
- endPointMap).encode());
- }
- }
-
/**
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@Override
public void onAssignment(Assignment assignment) {
List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
-
Collections.sort(partitions, PARTITION_COMPARATOR);
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
this.standbyTasks = info.standbyTasks;
+ this.activeTasks = new HashMap<>();
+
+ // the number of assigned partitions should be the same as number of active tasks, which
+ // could be duplicated if one task has more than one assigned partitions
+ if (partitions.size() != info.activeTasks.size()) {
+ throw new TaskAssignmentException(
+ String.format("stream-thread [%s] Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+ ", assignmentInfo=%s", streamThread.getName(), partitions.size(), info.activeTasks.size(), info.toString())
+ );
+ }
- Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
- Iterator<TaskId> iter = info.activeTasks.iterator();
- for (TopicPartition partition : partitions) {
- Set<TaskId> taskIds = partitionToTaskIds.get(partition);
- if (taskIds == null) {
- taskIds = new HashSet<>();
- partitionToTaskIds.put(partition, taskIds);
- }
+ for (int i = 0; i < partitions.size(); i++) {
+ TopicPartition partition = partitions.get(i);
+ TaskId id = info.activeTasks.get(i);
- if (iter.hasNext()) {
- taskIds.add(iter.next());
- } else {
- TaskAssignmentException ex = new TaskAssignmentException(
- String.format("stream-thread [%s] failed to find a task id for the partition=%s" +
- ", partitions=%d, assignmentInfo=%s", streamThread.getName(), partition.toString(), partitions.size(), info.toString())
- );
- log.error(ex.getMessage(), ex);
- throw ex;
+ Set<TopicPartition> assignedPartitions = activeTasks.get(id);
+ if (assignedPartitions == null) {
+ assignedPartitions = new HashSet<>();
+ activeTasks.put(id, assignedPartitions);
}
+ assignedPartitions.add(partition);
+ }
+
+ // only need to update the host partitions map if it is not leader
+ if (this.partitionsByHostState == null) {
+ this.partitionsByHostState = info.partitionsByHost;
}
- this.partitionToTaskIds = partitionToTaskIds;
- this.partitionsByHostState = info.partitionsByHostState;
- // only need to build when not coordinator
+
+ // only need to build if it is not leader
if (metadataWithInternalTopics == null) {
final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
@@ -558,92 +585,129 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
}
- public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
- if (partitionsByHostState == null) {
- return Collections.emptyMap();
- }
- return Collections.unmodifiableMap(partitionsByHostState);
- }
+ /**
+ * Internal helper function that creates a Kafka topic
+ *
+ * @param topicPartitions Map that contains the topic names to be created with the number of partitions
+ */
+ private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
+ log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
- public Cluster clusterMetadata() {
- if (metadataWithInternalTopics == null) {
- return Cluster.empty();
- }
- return metadataWithInternalTopics;
- }
+ // if ZK is specified, prepare the internal source topic before calling partition grouper
+ if (internalTopicManager != null) {
+ for (Map.Entry<String, InternalTopicMetadata> entry : topicPartitions.entrySet()) {
+ InternalTopicConfig topic = entry.getValue().config;
+ Integer numPartitions = entry.getValue().numPartitions;
- private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Collection<InternalTopicConfig>> internalTopicGroups, Cluster metadata) {
- Map<String, InternalTopicConfig> internalTopics = new HashMap<>();
- for (Collection<InternalTopicConfig> topics : internalTopicGroups.values()) {
- for (InternalTopicConfig topic : topics) {
- internalTopics.put(topic.name(), topic);
+ if (numPartitions < 0)
+ throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+
+ internalTopicManager.makeReady(topic, numPartitions);
+
+ // wait until the topic metadata has been propagated to all brokers
+ List<PartitionInfo> partitions;
+ do {
+ partitions = streamThread.restoreConsumer.partitionsFor(topic.name());
+ } while (partitions == null || partitions.size() != numPartitions);
+ }
+ } else {
+ List<String> missingTopics = new ArrayList<>();
+ for (String topic : topicPartitions.keySet()) {
+ List<PartitionInfo> partitions = streamThread.restoreConsumer.partitionsFor(topic);
+ if (partitions == null) {
+ missingTopics.add(topic);
+ }
+ }
+
+ if (!missingTopics.isEmpty()) {
+ log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied",
+ streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
}
}
+ log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
+ }
+
+ private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
+ Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+ Cluster metadata) {
for (Set<String> copartitionGroup : copartitionGroups) {
- ensureCopartitioning(copartitionGroup, internalTopics, metadata);
+ ensureCopartitioning(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
}
}
- private void ensureCopartitioning(Set<String> copartitionGroup, Map<String, InternalTopicConfig> internalTopics, Cluster metadata) {
+ private void ensureCopartitioning(Set<String> copartitionGroup,
+ Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+ Cluster metadata) {
int numPartitions = -1;
for (String topic : copartitionGroup) {
- if (!internalTopics.containsKey(topic)) {
- List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
+ if (!allRepartitionTopicsNumPartitions.containsKey(topic)) {
+ Integer partitions = metadata.partitionCountForTopic(topic);
- if (infos == null)
- throw new TopologyBuilderException(String.format("stream-thread [%s] External source topic not found: %s", streamThread.getName(), topic));
+ if (partitions == null)
+ throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", streamThread.getName(), topic));
if (numPartitions == -1) {
- numPartitions = infos.size();
- } else if (numPartitions != infos.size()) {
+ numPartitions = partitions;
+ } else if (numPartitions != partitions) {
String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
- throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not copartitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
+ throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
}
}
}
+ // if all topics for this co-partition group is repartition topics,
+ // then set the number of partitions to be the maximum of the number of partitions.
if (numPartitions == -1) {
- for (InternalTopicConfig topic : internalTopics.values()) {
- if (copartitionGroup.contains(topic.name())) {
- Integer partitions = metadata.partitionCountForTopic(topic.name());
- if (partitions != null && partitions > numPartitions) {
+ for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
+ if (copartitionGroup.contains(entry.getKey())) {
+ int partitions = entry.getValue().numPartitions;
+ if (partitions > numPartitions) {
numPartitions = partitions;
}
}
}
}
- // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
- for (InternalTopicConfig topic : internalTopics.values()) {
- if (copartitionGroup.contains(topic.name())) {
- internalSourceTopicToTaskIds
- .put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+
+ // enforce co-partitioning restrictions to repartition topics by updating their number of partitions
+ for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
+ if (copartitionGroup.contains(entry.getKey())) {
+ entry.getValue().numPartitions = numPartitions;
}
}
}
- /* For Test Only */
- public Set<TaskId> tasksForState(String stateName) {
- final String changeLogName = ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName);
- for (InternalTopicConfig internalTopicConfig : stateChangelogTopicToTaskIds.keySet()) {
- if (internalTopicConfig.name().equals(changeLogName)) {
- return stateChangelogTopicToTaskIds.get(internalTopicConfig);
- }
+ Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+ if (partitionsByHostState == null) {
+ return Collections.emptyMap();
}
- return Collections.emptySet();
+ return Collections.unmodifiableMap(partitionsByHostState);
+ }
+
+ Cluster clusterMetadata() {
+ if (metadataWithInternalTopics == null) {
+ return Cluster.empty();
+ }
+ return metadataWithInternalTopics;
}
- public Set<TaskId> tasksForPartition(TopicPartition partition) {
- return partitionToTaskIds.get(partition);
+ Map<TaskId, Set<TopicPartition>> activeTasks() {
+ if (activeTasks == null) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(activeTasks);
}
- public Map<TaskId, Set<TopicPartition>> standbyTasks() {
- return standbyTasks;
+ Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ if (standbyTasks == null) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(standbyTasks);
}
- public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+ void setInternalTopicManager(InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
@@ -651,11 +715,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
* Used to capture subscribed topic via Patterns discovered during the
* partition assignment process.
*/
- public static class SubscriptionUpdates {
+ public static class SubscriptionUpdates {
private final Set<String> updatedTopicSubscriptions = new HashSet<>();
-
private void updateTopics(Collection<String> topicNames) {
updatedTopicSubscriptions.clear();
updatedTopicSubscriptions.addAll(topicNames);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d7bb98c..7a04339 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -111,7 +111,6 @@ public class StreamThread extends Thread {
private boolean processStandbyRecords = false;
private AtomicBoolean initialized = new AtomicBoolean(false);
- private final long cacheSizeBytes;
private ThreadCache cache;
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@@ -184,7 +183,7 @@ public class StreamThread extends Thread {
if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadName);
}
- this.cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
+ long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG));
this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.sensors);
@@ -637,34 +636,24 @@ public class StreamThread extends Thread {
if (partitionAssignor == null)
throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
- HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
-
- for (TopicPartition partition : assignment) {
- Set<TaskId> taskIds = partitionAssignor.tasksForPartition(partition);
- for (TaskId taskId : taskIds) {
- Set<TopicPartition> partitions = partitionsForTask.get(taskId);
- if (partitions == null) {
- partitions = new HashSet<>();
- partitionsForTask.put(taskId, partitions);
- }
- partitions.add(partition);
- }
- }
-
// create the active tasks
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet()) {
TaskId taskId = entry.getKey();
Set<TopicPartition> partitions = entry.getValue();
- try {
- StreamTask task = createStreamTask(taskId, partitions);
- activeTasks.put(taskId, task);
+ if (assignment.containsAll(partitions)) {
+ try {
+ StreamTask task = createStreamTask(taskId, partitions);
+ activeTasks.put(taskId, task);
- for (TopicPartition partition : partitions)
- activeTasksByPartition.put(partition, task);
- } catch (StreamsException e) {
- log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
- throw e;
+ for (TopicPartition partition : partitions)
+ activeTasksByPartition.put(partition, task);
+ } catch (StreamsException e) {
+ log.error("{} Failed to create an active task %s: ", logPrefix, taskId, e);
+ throw e;
+ }
+ } else {
+ log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", logPrefix, taskId, partitions, assignment);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 6569f85..ddbd67d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -41,7 +41,7 @@ public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
/**
- * A new field was added, partitionsByHostState. CURRENT_VERSION
+ * A new field was added, partitionsByHost. CURRENT_VERSION
* is required so we can decode the previous version. For example, this may occur
* during a rolling upgrade
*/
@@ -49,7 +49,7 @@ public class AssignmentInfo {
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
public final Map<TaskId, Set<TopicPartition>> standbyTasks;
- public final Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+ public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
@@ -61,7 +61,7 @@ public class AssignmentInfo {
this.version = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
- this.partitionsByHostState = hostState;
+ this.partitionsByHost = hostState;
}
/**
@@ -89,8 +89,8 @@ public class AssignmentInfo {
Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions);
}
- out.writeInt(partitionsByHostState.size());
- for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHostState
+ out.writeInt(partitionsByHost.size());
+ for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
.entrySet()) {
final HostInfo hostInfo = entry.getKey();
out.writeUTF(hostInfo.host());
@@ -174,7 +174,7 @@ public class AssignmentInfo {
@Override
public int hashCode() {
- return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHostState.hashCode();
+ return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
}
@Override
@@ -184,7 +184,7 @@ public class AssignmentInfo {
return this.version == other.version &&
this.activeTasks.equals(other.activeTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
- this.partitionsByHostState.equals(other.partitionsByHostState);
+ this.partitionsByHost.equals(other.partitionsByHost);
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index b59af86..0746cab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -22,11 +22,12 @@ import java.util.Set;
public class ClientState<T> {
- public final static double COST_ACTIVE = 0.1;
- public final static double COST_STANDBY = 0.2;
- public final static double COST_LOAD = 0.5;
+ final static double COST_ACTIVE = 0.1;
+ final static double COST_STANDBY = 0.2;
+ final static double COST_LOAD = 0.5;
public final Set<T> activeTasks;
+ public final Set<T> standbyTasks;
public final Set<T> assignedTasks;
public final Set<T> prevActiveTasks;
public final Set<T> prevAssignedTasks;
@@ -39,11 +40,12 @@ public class ClientState<T> {
}
public ClientState(double capacity) {
- this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
+ this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
}
- private ClientState(Set<T> activeTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
+ private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
this.activeTasks = activeTasks;
+ this.standbyTasks = standbyTasks;
this.assignedTasks = assignedTasks;
this.prevActiveTasks = prevActiveTasks;
this.prevAssignedTasks = prevAssignedTasks;
@@ -52,13 +54,15 @@ public class ClientState<T> {
}
public ClientState<T> copy() {
- return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(assignedTasks),
+ return new ClientState<>(new HashSet<>(activeTasks), new HashSet<>(standbyTasks), new HashSet<>(assignedTasks),
new HashSet<>(prevActiveTasks), new HashSet<>(prevAssignedTasks), capacity);
}
public void assign(T taskId, boolean active) {
if (active)
activeTasks.add(taskId);
+ else
+ standbyTasks.add(taskId);
assignedTasks.add(taskId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
index fadb43f..e807c4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
@@ -33,24 +32,20 @@ public class TaskAssignor<C, T extends Comparable<T>> {
private static final Logger log = LoggerFactory.getLogger(TaskAssignor.class);
- public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas, String streamThreadId) {
+ public static <C, T extends Comparable<T>> void assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas) {
long seed = 0L;
for (C client : states.keySet()) {
seed += client.hashCode();
}
TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed);
- log.info("stream-thread [{}] Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " +
- "prevClientsUnchanged: {}, tasks: {}, replicas: {}",
- streamThreadId, states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged,
- tasks, numStandbyReplicas);
+ // assign active tasks
assignor.assignTasks();
+
+ // assign standby tasks
if (numStandbyReplicas > 0)
assignor.assignStandbyTasks(numStandbyReplicas);
-
- log.info("stream-thread [{}] Assigned with: {}", streamThreadId, assignor.states);
- return assignor.states;
}
private final Random rand;
@@ -63,36 +58,38 @@ public class TaskAssignor<C, T extends Comparable<T>> {
private TaskAssignor(Map<C, ClientState<T>> states, Set<T> tasks, long randomSeed) {
this.rand = new Random(randomSeed);
- this.states = new HashMap<>();
+ this.tasks = new ArrayList<>(tasks);
+ this.states = states;
+
int avgNumTasks = tasks.size() / states.size();
Set<T> existingTasks = new HashSet<>();
for (Map.Entry<C, ClientState<T>> entry : states.entrySet()) {
- this.states.put(entry.getKey(), entry.getValue().copy());
Set<T> oldTasks = entry.getValue().prevAssignedTasks;
+
// make sure the previous assignment is balanced
prevAssignmentBalanced = prevAssignmentBalanced &&
oldTasks.size() < 2 * avgNumTasks && oldTasks.size() > avgNumTasks / 2;
+
+ // make sure there are no duplicates
for (T task : oldTasks) {
- // Make sure there is no duplicates
prevClientsUnchanged = prevClientsUnchanged && !existingTasks.contains(task);
}
existingTasks.addAll(oldTasks);
}
- // Make sure the existing assignment didn't miss out any task
- prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
- this.tasks = new ArrayList<>(tasks);
+ // make sure the existing assignment didn't miss out any task
+ prevClientsUnchanged = prevClientsUnchanged && existingTasks.equals(tasks);
int numTasks = tasks.size();
this.maxNumTaskPairs = numTasks * (numTasks - 1) / 2;
this.taskPairs = new HashSet<>(this.maxNumTaskPairs);
}
- public void assignTasks() {
+ private void assignTasks() {
assignTasks(true);
}
- public void assignStandbyTasks(int numStandbyReplicas) {
+ private void assignStandbyTasks(int numStandbyReplicas) {
int numReplicas = Math.min(numStandbyReplicas, states.size() - 1);
for (int i = 0; i < numReplicas; i++) {
assignTasks(false);
@@ -195,10 +192,10 @@ public class TaskAssignor<C, T extends Comparable<T>> {
}
private static class TaskPair<T> {
- public final T task1;
- public final T task2;
+ final T task1;
+ final T task2;
- public TaskPair(T task1, T task2) {
+ TaskPair(T task1, T task2) {
this.task1 = task1;
this.task2 = task2;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 37a15e1..ce75d91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -74,7 +74,7 @@ public class HostInfo {
@Override
public String toString() {
return "HostInfo{" +
- "host='" + host + '\'' +
+ "host=\'" + host + '\'' +
", port=" + port +
'}';
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 41f9ae2..83e3d10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -120,14 +120,4 @@ public class StoreChangeLogger<K, V> {
this.removed.clear();
this.dirty.clear();
}
-
- // this is for test only
- public int numDirty() {
- return this.dirty.size();
- }
-
- // this is for test only
- public int numRemoved() {
- return this.removed.size();
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index a5fb076..03df4cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -85,7 +85,6 @@ public class FanoutIntegrationTest {
CLUSTER.createTopic(OUTPUT_TOPIC_C);
}
-
@Parameter
public long cacheSizeBytes;
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index e5e334c..8c96ecb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -206,7 +206,7 @@ public class KStreamImplTest {
@Test(expected = NullPointerException.class)
public void shouldCantHaveNullPredicate() throws Exception {
- testStream.branch(null);
+ testStream.branch((Predicate) null);
}
@Test(expected = NullPointerException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index afa1033..49dcbd0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -40,6 +40,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.lang.reflect.Field;
import java.io.File;
import java.io.IOException;
@@ -337,7 +338,7 @@ public class KTableImplTest {
}
@Test
- public void testRepartition() throws IOException {
+ public void testRepartition() throws Exception {
String topic1 = "topic1";
String storeName1 = "storeName1";
@@ -367,10 +368,15 @@ public class KTableImplTest {
assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
- assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
- assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
- assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
- assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+ Field valSerializerField = ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
+ Field valDeserializerField = ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+ valSerializerField.setAccessible(true);
+ valDeserializerField.setAccessible(true);
+
+ assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner());
+ assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
+ assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner());
+ assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
}
@Test(expected = NullPointerException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a4ab9d02/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index d260937..c402c9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -156,7 +156,7 @@ public class TopologyBuilderTest {
builder.addSource("source-3", "topic-3");
builder.addInternalTopic("topic-3");
- Set<String> expected = new HashSet<String>();
+ Set<String> expected = new HashSet<>();
expected.add("topic-1");
expected.add("topic-2");
expected.add("X-topic-3");
@@ -516,7 +516,7 @@ public class TopologyBuilderTest {
builder.addInternalTopic("foo");
builder.addSource("source", "foo");
final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
- final InternalTopicConfig topicConfig = topicsInfo.interSourceTopics.get("appId-foo");
+ final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Properties properties = topicConfig.toProperties(0);
assertEquals("appId-foo", topicConfig.name());
assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));