You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/22 22:00:09 UTC
[1/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to
KafkaStreams
Repository: kafka
Updated Branches:
refs/heads/trunk 91ba074e4 -> 21c6cfe50
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
new file mode 100644
index 0000000..15b114a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamPartitionAssignorTest {
+
+ private TopicPartition t1p0 = new TopicPartition("topic1", 0);
+ private TopicPartition t1p1 = new TopicPartition("topic1", 1);
+ private TopicPartition t1p2 = new TopicPartition("topic1", 2);
+ private TopicPartition t2p0 = new TopicPartition("topic2", 0);
+ private TopicPartition t2p1 = new TopicPartition("topic2", 1);
+ private TopicPartition t2p2 = new TopicPartition("topic2", 2);
+ private TopicPartition t3p0 = new TopicPartition("topic3", 0);
+ private TopicPartition t3p1 = new TopicPartition("topic3", 1);
+ private TopicPartition t3p2 = new TopicPartition("topic3", 2);
+ private TopicPartition t3p3 = new TopicPartition("topic3", 3);
+
+ private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
+
+ private List<PartitionInfo> infos = Arrays.asList(
+ new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
+ );
+
+ private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+ private final TaskId task0 = new TaskId(0, 0);
+ private final TaskId task1 = new TaskId(0, 1);
+ private final TaskId task2 = new TaskId(0, 2);
+ private final TaskId task3 = new TaskId(0, 3);
+
+ private Properties configProps() {
+ return new Properties() {
+ {
+ setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+ setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test");
+ setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+ setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+ }
+ };
+ }
+
+ private ByteArraySerializer serializer = new ByteArraySerializer();
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSubscription() throws Exception {
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+
+ final Set<TaskId> prevTasks = Utils.mkSet(
+ new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
+ final Set<TaskId> cachedTasks = Utils.mkSet(
+ new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
+ new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
+
+ String clientId = "client-id";
+ UUID processId = UUID.randomUUID();
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
+ @Override
+ public Set<TaskId> prevTasks() {
+ return prevTasks;
+ }
+ @Override
+ public Set<TaskId> cachedTasks() {
+ return cachedTasks;
+ }
+ };
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
+
+ PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
+
+ Collections.sort(subscription.topics());
+ assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
+
+ Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
+ standbyTasks.removeAll(prevTasks);
+
+ SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
+ assertEquals(info.encode(), subscription.userData());
+ }
+
+ @Test
+ public void testAssignBasic() throws Exception {
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+ List<String> topics = Utils.mkList("topic1", "topic2");
+ Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+ final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+ final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+ final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+ final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+ final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+ final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+ String client1 = "client1";
+ String client2 = "client2";
+
+ StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+ Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ subscriptions.put("consumer10",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+ subscriptions.put("consumer11",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+ subscriptions.put("consumer20",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+ Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+ // check assigned partitions
+ assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
+ Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
+ assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
+
+ // check assignment info
+
+ Set<TaskId> allActiveTasks = new HashSet<>();
+
+ // the first consumer
+ AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+ allActiveTasks.addAll(info10.activeTasks);
+
+ // the second consumer
+ AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+ allActiveTasks.addAll(info11.activeTasks);
+
+ assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
+
+ // the third consumer
+ AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+ allActiveTasks.addAll(info20.activeTasks);
+
+ assertEquals(3, allActiveTasks.size());
+ assertEquals(allTasks, new HashSet<>(allActiveTasks));
+
+ assertEquals(3, allActiveTasks.size());
+ assertEquals(allTasks, allActiveTasks);
+ }
+
+ @Test
+ public void testAssignWithNewTasks() throws Exception {
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+ builder.addSource("source3", "topic3");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
+ List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
+ Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
+
+ // assuming that previous tasks do not have topic3
+ final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+ final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+ final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+ String client1 = "client1";
+ String client2 = "client2";
+
+ StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+ Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ subscriptions.put("consumer10",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
+ subscriptions.put("consumer11",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
+ subscriptions.put("consumer20",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
+
+ Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+ // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
+ // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
+ // then later ones will be re-assigned to other hosts due to load balancing
+ Set<TaskId> allActiveTasks = new HashSet<>();
+ Set<TopicPartition> allPartitions = new HashSet<>();
+ AssignmentInfo info;
+
+ info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ allActiveTasks.addAll(info.activeTasks);
+ allPartitions.addAll(assignments.get("consumer10").partitions());
+
+ info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+ allActiveTasks.addAll(info.activeTasks);
+ allPartitions.addAll(assignments.get("consumer11").partitions());
+
+ info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+ allActiveTasks.addAll(info.activeTasks);
+ allPartitions.addAll(assignments.get("consumer20").partitions());
+
+ assertEquals(allTasks, allActiveTasks);
+ assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
+ }
+
+ @Test
+ public void testAssignWithStates() throws Exception {
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+
+ builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
+ builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
+
+ builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
+ builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
+ builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
+
+ List<String> topics = Utils.mkList("topic1", "topic2");
+
+ TaskId task00 = new TaskId(0, 0);
+ TaskId task01 = new TaskId(0, 1);
+ TaskId task02 = new TaskId(0, 2);
+ TaskId task10 = new TaskId(1, 0);
+ TaskId task11 = new TaskId(1, 1);
+ TaskId task12 = new TaskId(1, 2);
+
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+ String client1 = "client1";
+ String client2 = "client2";
+
+ StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+ Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ subscriptions.put("consumer10",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+ subscriptions.put("consumer11",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+ subscriptions.put("consumer20",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+
+ Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+ // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
+ assertEquals(2, assignments.get("consumer10").partitions().size());
+ assertEquals(2, assignments.get("consumer11").partitions().size());
+ assertEquals(2, assignments.get("consumer20").partitions().size());
+
+ assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
+ assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
+ assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
+
+ // check tasks for state topics
+ assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
+ assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
+ assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
+ }
+
+ @Test
+ public void testAssignWithStandbyReplicas() throws Exception {
+ Properties props = configProps();
+ props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+ StreamsConfig config = new StreamsConfig(props);
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+ List<String> topics = Utils.mkList("topic1", "topic2");
+ Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+
+ final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+ final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+ final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+ final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+ final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+ final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+ String client1 = "client1";
+ String client2 = "client2";
+
+ StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+
+ Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ subscriptions.put("consumer10",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+ subscriptions.put("consumer11",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+ subscriptions.put("consumer20",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+ Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+ Set<TaskId> allActiveTasks = new HashSet<>();
+ Set<TaskId> allStandbyTasks = new HashSet<>();
+
+ // the first consumer
+ AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
+ allActiveTasks.addAll(info10.activeTasks);
+ allStandbyTasks.addAll(info10.standbyTasks.keySet());
+
+ // the second consumer
+ AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
+ allActiveTasks.addAll(info11.activeTasks);
+ allStandbyTasks.addAll(info11.standbyTasks.keySet());
+
+ // check active tasks assigned to the first client
+ assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+ assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
+
+ // the third consumer
+ AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
+ allActiveTasks.addAll(info20.activeTasks);
+ allStandbyTasks.addAll(info20.standbyTasks.keySet());
+
+ // all task ids are in the active tasks and also in the standby tasks
+
+ assertEquals(3, allActiveTasks.size());
+ assertEquals(allTasks, allActiveTasks);
+
+ assertEquals(3, allStandbyTasks.size());
+ assertEquals(allTasks, allStandbyTasks);
+ }
+
+ private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
+
+ // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
+
+ AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+ // check if the number of assigned partitions == the size of active task id list
+ assertEquals(assignment.partitions().size(), info.activeTasks.size());
+
+ // check if active tasks are consistent
+ List<TaskId> activeTasks = new ArrayList<>();
+ Set<String> activeTopics = new HashSet<>();
+ for (TopicPartition partition : assignment.partitions()) {
+ // since default grouper, taskid.partition == partition.partition()
+ activeTasks.add(new TaskId(0, partition.partition()));
+ activeTopics.add(partition.topic());
+ }
+ assertEquals(activeTasks, info.activeTasks);
+
+ // check if active partitions cover all topics
+ assertEquals(allTopics, activeTopics);
+
+ // check if standby tasks are consistent
+ Set<String> standbyTopics = new HashSet<>();
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+ TaskId id = entry.getKey();
+ Set<TopicPartition> partitions = entry.getValue();
+ for (TopicPartition partition : partitions) {
+ // since default grouper, taskid.partition == partition.partition()
+ assertEquals(id.partition, partition.partition());
+
+ standbyTopics.add(partition.topic());
+ }
+ }
+
+ if (info.standbyTasks.size() > 0)
+ // check if standby partitions cover all topics
+ assertEquals(allTopics, standbyTopics);
+
+ return info;
+ }
+
+ @Test
+ public void testOnAssignment() throws Exception {
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+ MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+ TopicPartition t2p3 = new TopicPartition("topic2", 3);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+ builder.addSource("source2", "topic2");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+
+ UUID uuid = UUID.randomUUID();
+ String client1 = "client1";
+
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
+
+ List<TaskId> activeTaskList = Utils.mkList(task0, task3);
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+ standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
+ standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
+
+ AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
+ PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
+ partitionAssignor.onAssignment(assignment);
+
+ assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
+ assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
+ assertEquals(standbyTasks, partitionAssignor.standbyTasks());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1847e85..bf3b3b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.MockSourceNode;
@@ -69,17 +69,18 @@ public class StreamTaskTest {
Collections.<StateStoreSupplier>emptyList()
);
- private StreamingConfig createConfig(final File baseDir) throws Exception {
- return new StreamingConfig(new Properties() {
+ private StreamsConfig createConfig(final File baseDir) throws Exception {
+ return new StreamsConfig(new Properties() {
{
- setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
- setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
- setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+ setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+ setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test");
+ setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+ setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+ setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
}
});
}
@@ -102,7 +103,7 @@ public class StreamTaskTest {
public void testProcessOrder() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- StreamingConfig config = createConfig(baseDir);
+ StreamsConfig config = createConfig(baseDir);
StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
@@ -153,7 +154,7 @@ public class StreamTaskTest {
public void testPauseResume() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- StreamingConfig config = createConfig(baseDir);
+ StreamsConfig config = createConfig(baseDir);
StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5f0347d..2d531bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -37,7 +37,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -112,13 +112,14 @@ public class StreamThreadTest {
private Properties configProps() {
return new Properties() {
{
- setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
- setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+ setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+ setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+ setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test");
+ setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
}
};
}
@@ -132,7 +133,7 @@ public class StreamThreadTest {
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
- StreamingConfig config) {
+ StreamsConfig config) {
super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null);
}
@@ -148,7 +149,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@Test
public void testPartitionAssignmentChange() throws Exception {
- StreamingConfig config = new StreamingConfig(configProps());
+ StreamsConfig config = new StreamsConfig(configProps());
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -258,10 +259,10 @@ public class StreamThreadTest {
try {
final long cleanupDelay = 1000L;
Properties props = configProps();
- props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
- props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+ props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay));
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
- StreamingConfig config = new StreamingConfig(props);
+ StreamsConfig config = new StreamsConfig(props);
File stateDir1 = new File(baseDir, task1.toString());
File stateDir2 = new File(baseDir, task2.toString());
@@ -389,10 +390,10 @@ public class StreamThreadTest {
try {
final long commitInterval = 1000L;
Properties props = configProps();
- props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
- props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+ props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
- StreamingConfig config = new StreamingConfig(props);
+ StreamsConfig config = new StreamsConfig(props);
MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -468,9 +469,9 @@ public class StreamThreadTest {
}
}
- private void initPartitionGrouper(StreamingConfig config, StreamThread thread) {
+ private void initPartitionGrouper(StreamsConfig config, StreamThread thread) {
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b0c9bd7..36e487b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -23,12 +23,13 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.test.MockProcessorContext;
@@ -221,10 +222,10 @@ public class KeyValueStoreTestDriver<K, V> {
private final Serdes<K, V> serdes;
private final Map<K, V> flushedEntries = new HashMap<>();
private final Set<K> flushedRemovals = new HashSet<>();
- private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
+ private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
private final MockProcessorContext context;
private final Map<String, StateStore> storeMap = new HashMap<>();
- private final StreamingMetrics metrics = new StreamingMetrics() {
+ private final StreamsMetrics metrics = new StreamsMetrics() {
@Override
public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
return null;
@@ -248,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
}
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
- StreamPartitioner<K1, V1> partitioner) {
+ StreamsPartitioner<K1, V1> partitioner) {
recordFlushed(record.key(), record.value());
}
};
@@ -256,12 +257,12 @@ public class KeyValueStoreTestDriver<K, V> {
this.stateDir.mkdirs();
Properties props = new Properties();
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass());
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
serdes.valueDeserializer(), recordCollector) {
@@ -287,7 +288,7 @@ public class KeyValueStoreTestDriver<K, V> {
}
@Override
- public StreamingMetrics metrics() {
+ public StreamsMetrics metrics() {
return metrics;
}
@@ -313,10 +314,10 @@ public class KeyValueStoreTestDriver<K, V> {
}
private void restoreEntries(StateRestoreCallback func) {
- for (Entry<K, V> entry : restorableEntries) {
+ for (KeyValue<K, V> entry : restorableEntries) {
if (entry != null) {
- byte[] rawKey = serdes.rawKey(entry.key());
- byte[] rawValue = serdes.rawValue(entry.value());
+ byte[] rawKey = serdes.rawKey(entry.key);
+ byte[] rawValue = serdes.rawValue(entry.value);
func.restore(rawKey, rawValue);
}
}
@@ -352,7 +353,7 @@ public class KeyValueStoreTestDriver<K, V> {
* @see #checkForRestoredEntries(KeyValueStore)
*/
public void addEntryToRestoreLog(K key, V value) {
- restorableEntries.add(new Entry<K, V>(key, value));
+ restorableEntries.add(new KeyValue<K, V>(key, value));
}
/**
@@ -376,7 +377,7 @@ public class KeyValueStoreTestDriver<K, V> {
*
* @return the restore entries; never null but possibly a null iterator
*/
- public Iterable<Entry<K, V>> restoredEntries() {
+ public Iterable<KeyValue<K, V>> restoredEntries() {
return restorableEntries;
}
@@ -390,10 +391,10 @@ public class KeyValueStoreTestDriver<K, V> {
*/
public int checkForRestoredEntries(KeyValueStore<K, V> store) {
int missing = 0;
- for (Entry<K, V> entry : restorableEntries) {
- if (entry != null) {
- V value = store.get(entry.key());
- if (!Objects.equals(value, entry.value())) {
+ for (KeyValue<K, V> kv : restorableEntries) {
+ if (kv != null) {
+ V value = store.get(kv.key);
+ if (!Objects.equals(value, kv.value)) {
++missing;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 2ed698c..8effd77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -21,8 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
@@ -73,11 +73,11 @@ public abstract class AbstractKeyValueStoreTest {
// Check range iteration ...
try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
while (iter.hasNext()) {
- Entry<Integer, String> entry = iter.next();
- if (entry.key().equals(2))
- assertEquals("two", entry.value());
- else if (entry.key().equals(4))
- assertEquals("four", entry.value());
+ KeyValue<Integer, String> entry = iter.next();
+ if (entry.key.equals(2))
+ assertEquals("two", entry.value);
+ else if (entry.key.equals(4))
+ assertEquals("four", entry.value);
else
fail("Unexpected entry: " + entry);
}
@@ -86,11 +86,11 @@ public abstract class AbstractKeyValueStoreTest {
// Check range iteration ...
try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
while (iter.hasNext()) {
- Entry<Integer, String> entry = iter.next();
- if (entry.key().equals(2))
- assertEquals("two", entry.value());
- else if (entry.key().equals(4))
- assertEquals("four", entry.value());
+ KeyValue<Integer, String> entry = iter.next();
+ if (entry.key.equals(2))
+ assertEquals("two", entry.value);
+ else if (entry.key.equals(4))
+ assertEquals("four", entry.value);
else
fail("Unexpected entry: " + entry);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 80ad67f..45448e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -24,10 +24,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -69,12 +69,12 @@ public class RocksDBWindowStoreTest {
public void testPutAndFetch() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
+ changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
@@ -165,12 +165,12 @@ public class RocksDBWindowStoreTest {
public void testPutAndFetchBefore() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
+ changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
@@ -261,12 +261,12 @@ public class RocksDBWindowStoreTest {
public void testPutAndFetchAfter() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
+ changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
@@ -357,12 +357,12 @@ public class RocksDBWindowStoreTest {
public void testPutSameKeyTimestamp() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
+ changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
@@ -416,12 +416,12 @@ public class RocksDBWindowStoreTest {
public void testRolling() throws IOException {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
+ changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
@@ -528,7 +528,7 @@ public class RocksDBWindowStoreTest {
@Test
public void testRestore() throws IOException {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
long startTime = segmentSize * 2;
long incr = segmentSize / 2;
@@ -538,7 +538,7 @@ public class RocksDBWindowStoreTest {
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
+ changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
@@ -587,7 +587,7 @@ public class RocksDBWindowStoreTest {
RecordCollector recordCollector = new RecordCollector(producer) {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
+ changeLog.add(new KeyValue<>(
keySerializer.serialize(record.topic(), record.key()),
valueSerializer.serialize(record.topic(), record.value()))
);
@@ -655,13 +655,13 @@ public class RocksDBWindowStoreTest {
return set;
}
- private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
+ private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
- for (Entry<byte[], byte[]> entry : changeLog) {
- long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
- Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
- String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
+ for (KeyValue<byte[], byte[]> entry : changeLog) {
+ long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key);
+ Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key, serdes);
+ String value = entry.value == null ? null : serdes.valueFrom(entry.value);
Set<String> entries = entriesByKey.get(key);
if (entries == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2dc567e..8f8e00f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -130,7 +130,7 @@ public class KStreamTestDriver {
@Override
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
- StreamPartitioner<K, V> partitioner) {
+ StreamsPartitioner<K, V> partitioner) {
// The serialization is skipped.
process(record.topic(), record.key(), record.value());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index a6a29cd..cb7a95c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -18,7 +18,8 @@
package org.apache.kafka.test;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -26,7 +27,6 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Entry;
import java.io.File;
import java.util.Collections;
@@ -123,8 +123,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
}
@Override
- public StreamingMetrics metrics() {
- return new StreamingMetrics() {
+ public StreamsMetrics metrics() {
+ return new StreamsMetrics() {
@Override
public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
return null;
@@ -192,10 +192,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
return Collections.unmodifiableMap(storeMap);
}
- public void restore(String storeName, List<Entry<byte[], byte[]>> changeLog) {
+ public void restore(String storeName, List<KeyValue<byte[], byte[]>> changeLog) {
StateRestoreCallback restoreCallback = restoreFuncs.get(storeName);
- for (Entry<byte[], byte[]> entry : changeLog) {
- restoreCallback.restore(entry.key(), entry.value());
+ for (KeyValue<byte[], byte[]> entry : changeLog) {
+ restoreCallback.restore(entry.key, entry.value);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
index 98aa0d4..828b5ae 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
@@ -17,7 +17,7 @@
package org.apache.kafka.test;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index eaeed09..af6d51b 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicLong;
* can use and test code you already have that uses a builder to create topologies. Best of all, the class works without a real
* Kafka broker, so the tests execute very quickly with very little overhead.
* <p>
- * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamingConfig} and a
+ * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamsConfig} and a
* TopologyBuilder, use the driver to supply an input message to the topology, and then use the driver to read and verify any
* messages output by the topology.
* <p>
@@ -65,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong;
*
* <h2>Driver setup</h2>
* <p>
- * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamingConfig}. The
+ * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamsConfig}. The
* configuration needs to be representative of what you'd supply to the real topology, so that means including several key
* properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list
* (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values:
@@ -74,13 +74,13 @@ import java.util.concurrent.atomic.AtomicLong;
* StringSerializer strSerializer = new StringSerializer();
* StringDeserializer strDeserializer = new StringDeserializer();
* Properties props = new Properties();
- * props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- * props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- * props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
- * props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
- * StreamingConfig config = new StreamingConfig(props);
+ * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ * props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+ * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
+ * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
+ * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
+ * props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
+ * StreamsConfig config = new StreamsConfig(props);
* TopologyBuilder builder = ...
* ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
* </pre>
@@ -139,11 +139,11 @@ public class ProcessorTopologyTestDriver {
/**
* Create a new test driver instance.
- * @param config the streaming configuration for the topology
+ * @param config the stream configuration for the topology
* @param builder the topology builder that will be used to create the topology instance
* @param storeNames the optional names of the state stores that are used by the topology
*/
- public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) {
+ public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
id = new TaskId(0, 0);
topology = builder.build(null);
@@ -173,7 +173,7 @@ public class ProcessorTopologyTestDriver {
producer,
restoreStateConsumer,
config,
- new StreamingMetrics() {
+ new StreamsMetrics() {
@Override
public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
return null;
@@ -265,7 +265,7 @@ public class ProcessorTopologyTestDriver {
/**
* Get the {@link StateStore} with the given name. The name should have been supplied via
- * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
+ * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
* presumed to be used by a Processor within the topology.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
@@ -281,7 +281,7 @@ public class ProcessorTopologyTestDriver {
/**
* Get the {@link KeyValueStore} with the given name. The name should have been supplied via
- * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is
+ * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is
* presumed to be used by a Processor within the topology.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
[3/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to
KafkaStreams
Posted by gw...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
deleted file mode 100644
index f14d9d9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
- * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
- * <p>
- * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
- * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
- * <p>
- * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
- * automatically manage these instances, and adjust when new topology instances are added or removed.
- * <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
- * <p>
- * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
- * for that topic.
- * <p>
- * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
- * org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
- */
-public interface StreamPartitioner<K, V> {
-
- /**
- * Determine the partition number for a message with the given key and value and the current number of partitions.
- *
- * @param key the key of the message
- * @param value the value of the message
- * @param numPartitions the total number of partitions
- * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
- */
- Integer partition(K key, V value, int numPartitions);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
new file mode 100644
index 0000000..f8d199d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
+ * <p>
+ * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
+ * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
+ * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * <p>
+ * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
+ * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * automatically manage these instances, and adjust when new topology instances are added or removed.
+ * <p>
+ * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
+ * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each message should be written.
+ * <p>
+ * To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner
+ * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink}
+ * for that topic.
+ * <p>
+ * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ * org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...)
+ * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...)
+ */
+public interface StreamsPartitioner<K, V> {
+
+ /**
+ * Determine the partition number for a message with the given key and value and the current number of partitions.
+ *
+ * @param key the key of the message
+ * @param value the value of the message
+ * @param numPartitions the total number of partitions
+ * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+ */
+ Integer partition(K key, V value, int numPartitions);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index d6b63d2..f4e6821 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -46,8 +46,8 @@ import java.util.Set;
* its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
* processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
* is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}.
+ * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}.
*/
public class TopologyBuilder {
@@ -135,9 +135,9 @@ public class TopologyBuilder {
public final String topic;
private Serializer keySerializer;
private Serializer valSerializer;
- private final StreamPartitioner partitioner;
+ private final StreamsPartitioner partitioner;
- private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
+ private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) {
super(name);
this.parents = parents.clone();
this.topic = topic;
@@ -188,9 +188,9 @@ public class TopologyBuilder {
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+ * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@@ -208,11 +208,11 @@ public class TopologyBuilder {
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
- * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
- * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
@@ -236,18 +236,18 @@ public class TopologyBuilder {
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
- * The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
- * {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+ * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
- * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, StreamsPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
@@ -256,11 +256,11 @@ public class TopologyBuilder {
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
* the supplied partitioner.
- * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
- * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}.
+ * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
* <p>
- * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+ * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -274,9 +274,9 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
*/
- public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
+ public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
}
@@ -284,7 +284,7 @@ public class TopologyBuilder {
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers.
* <p>
- * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+ * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -293,20 +293,20 @@ public class TopologyBuilder {
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamPartitioner, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ * @see #addSink(String, String, StreamsPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
- return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
+ return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames);
}
/**
@@ -316,20 +316,20 @@ public class TopologyBuilder {
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param partitioner the function that should be used to determine the partition for each message processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, StreamsPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
*/
- public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
+ public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) {
if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
@@ -589,9 +589,9 @@ public class TopologyBuilder {
/**
* Build the topology for the specified topic group. This is called automatically when passing this builder into the
- * {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor.
+ * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
*
- * @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)
+ * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
public ProcessorTopology build(Integer topicGroupId) {
Set<String> nodeGroup;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 569f4e6..ef4c3c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -48,7 +48,7 @@ public abstract class AbstractTask {
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
- StreamingConfig config,
+ StreamsConfig config,
boolean isStandby) {
this.id = id;
this.partitions = new HashSet<>(partitions);
@@ -57,7 +57,7 @@ public abstract class AbstractTask {
// create the processor state manager
try {
- File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+ File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
deleted file mode 100644
index 2734f56..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.ZooDefs;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.I0Itec.zkclient.ZkClient;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
-
- private StreamThread streamThread;
-
- private int numStandbyReplicas;
- private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
- private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
- private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
- private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
- private Map<TaskId, Set<TopicPartition>> standbyTasks;
-
- // TODO: the following ZK dependency should be removed after KIP-4
- private static final String ZK_TOPIC_PATH = "/brokers/topics";
- private static final String ZK_BROKER_PATH = "/brokers/ids";
- private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
-
- private ZkClient zkClient;
-
- private class ZKStringSerializer implements ZkSerializer {
-
- @Override
- public byte[] serialize(Object data) {
- try {
- return ((String) data).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- }
-
- @Override
- public Object deserialize(byte[] bytes) {
- try {
- if (bytes == null)
- return null;
- else
- return new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- }
- }
-
- private List<Integer> getBrokers() {
- List<Integer> brokers = new ArrayList<>();
- for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
- brokers.add(Integer.parseInt(broker));
- }
- Collections.sort(brokers);
-
- log.debug("Read brokers {} from ZK in partition assignor.", brokers);
-
- return brokers;
- }
-
- @SuppressWarnings("unchecked")
- private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
- String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
-
- if (data == null) return null;
-
- try {
- ObjectMapper mapper = new ObjectMapper();
-
- Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
-
- });
-
- Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
-
- log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
-
- return partitions;
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- }
-
- private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
- log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
-
- // we always assign leaders to brokers starting at the first one with replication factor 1
- List<Integer> brokers = getBrokers();
-
- Map<Integer, List<Integer>> assignment = new HashMap<>();
- for (int i = 0; i < numPartitions; i++) {
- assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
- }
-
- // try to write to ZK with open ACL
- try {
- Map<String, Object> dataMap = new HashMap<>();
- dataMap.put("version", 1);
- dataMap.put("partitions", assignment);
-
- ObjectMapper mapper = new ObjectMapper();
- String data = mapper.writeValueAsString(dataMap);
-
- zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
- } catch (JsonProcessingException e) {
- throw new KafkaException(e);
- }
- }
-
- private void deleteTopic(String topic) throws ZkNodeExistsException {
- log.debug("Deleting topic {} from ZK in partition assignor.", topic);
-
- zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
- }
-
- private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
- log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
-
- // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
- List<Integer> brokers = getBrokers();
-
- int startIndex = existingAssignment.size();
-
- Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
-
- for (int i = 0; i < numPartitions; i++) {
- newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
- }
-
- // try to write to ZK with open ACL
- try {
- Map<String, Object> dataMap = new HashMap<>();
- dataMap.put("version", 1);
- dataMap.put("partitions", newAssignment);
-
- ObjectMapper mapper = new ObjectMapper();
- String data = mapper.writeValueAsString(dataMap);
-
- zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
- } catch (JsonProcessingException e) {
- throw new KafkaException(e);
- }
- }
-
- /**
- * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
- * since the former needs later's cached metadata while sending subscriptions,
- * and the latter needs former's returned assignment when adding tasks.
- */
- @Override
- public void configure(Map<String, ?> configs) {
- numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
- Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
- if (o == null) {
- KafkaException ex = new KafkaException("StreamThread is not specified");
- log.error(ex.getMessage(), ex);
- throw ex;
- }
-
- if (!(o instanceof StreamThread)) {
- KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
- log.error(ex.getMessage(), ex);
- throw ex;
- }
-
- streamThread = (StreamThread) o;
- streamThread.partitionAssignor(this);
-
- this.topicGroups = streamThread.builder.topicGroups();
-
- if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG))
- zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
- }
-
- @Override
- public String name() {
- return "streaming";
- }
-
- @Override
- public Subscription subscription(Set<String> topics) {
- // Adds the following information to subscription
- // 1. Client UUID (a unique id assigned to an instance of KafkaStreaming)
- // 2. Task ids of previously running tasks
- // 3. Task ids of valid local states on the client's state directory.
-
- Set<TaskId> prevTasks = streamThread.prevTasks();
- Set<TaskId> standbyTasks = streamThread.cachedTasks();
- standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
-
- return new Subscription(new ArrayList<>(topics), data.encode());
- }
-
- @Override
- public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- // This assigns tasks to consumer clients in two steps.
- // 1. using TaskAssignor tasks are assigned to streaming clients.
- // - Assign a task to a client which was running it previously.
- // If there is no such client, assign a task to a client which has its valid local state.
- // - A client may have more than one stream threads.
- // The assignor tries to assign tasks to a client proportionally to the number of threads.
- // - We try not to assign the same set of tasks to two different clients
- // We do the assignment in one-pass. The result may not satisfy above all.
- // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
- Map<UUID, Set<String>> consumersByClient = new HashMap<>();
- Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-
- // Decode subscription info
- for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
- String consumerId = entry.getKey();
- Subscription subscription = entry.getValue();
-
- SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-
- Set<String> consumers = consumersByClient.get(info.processId);
- if (consumers == null) {
- consumers = new HashSet<>();
- consumersByClient.put(info.processId, consumers);
- }
- consumers.add(consumerId);
-
- ClientState<TaskId> state = states.get(info.processId);
- if (state == null) {
- state = new ClientState<>();
- states.put(info.processId, state);
- }
-
- state.prevActiveTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.standbyTasks);
- state.capacity = state.capacity + 1d;
- }
-
- // get the tasks as partition groups from the partition grouper
- Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
- sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
- }
- Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
-
- // add tasks to state topic subscribers
- stateChangelogTopicToTaskIds = new HashMap<>();
- internalSourceTopicToTaskIds = new HashMap<>();
- for (TaskId task : partitionsForTask.keySet()) {
- for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
- Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
- if (tasks == null) {
- tasks = new HashSet<>();
- stateChangelogTopicToTaskIds.put(stateName, tasks);
- }
-
- tasks.add(task);
- }
-
- for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
- Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
- if (tasks == null) {
- tasks = new HashSet<>();
- internalSourceTopicToTaskIds.put(topicName, tasks);
- }
-
- tasks.add(task);
- }
- }
-
- // assign tasks to clients
- states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
- Map<String, Assignment> assignment = new HashMap<>();
-
- for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
- UUID processId = entry.getKey();
- Set<String> consumers = entry.getValue();
- ClientState<TaskId> state = states.get(processId);
-
- ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
- final int numActiveTasks = state.activeTasks.size();
- for (TaskId taskId : state.activeTasks) {
- taskIds.add(taskId);
- }
- for (TaskId id : state.assignedTasks) {
- if (!state.activeTasks.contains(id))
- taskIds.add(id);
- }
-
- final int numConsumers = consumers.size();
- List<TaskId> active = new ArrayList<>();
- Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
-
- int i = 0;
- for (String consumer : consumers) {
- List<TopicPartition> activePartitions = new ArrayList<>();
-
- final int numTaskIds = taskIds.size();
- for (int j = i; j < numTaskIds; j += numConsumers) {
- TaskId taskId = taskIds.get(j);
- if (j < numActiveTasks) {
- for (TopicPartition partition : partitionsForTask.get(taskId)) {
- activePartitions.add(partition);
- active.add(taskId);
- }
- } else {
- Set<TopicPartition> standbyPartitions = standby.get(taskId);
- if (standbyPartitions == null) {
- standbyPartitions = new HashSet<>();
- standby.put(taskId, standbyPartitions);
- }
- standbyPartitions.addAll(partitionsForTask.get(taskId));
- }
- }
-
- AssignmentInfo data = new AssignmentInfo(active, standby);
- assignment.put(consumer, new Assignment(activePartitions, data.encode()));
- i++;
-
- active.clear();
- standby.clear();
- }
- }
-
- // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
- if (zkClient != null) {
- log.debug("Starting to validate changelog topics in partition assignor.");
-
- Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
- topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
- topicToTaskIds.putAll(internalSourceTopicToTaskIds);
-
- for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
- String topic = streamThread.jobId + "-" + entry.getKey();
-
- // the expected number of partitions is the max value of TaskId.partition + 1
- int numPartitions = 0;
- for (TaskId task : entry.getValue()) {
- if (numPartitions < task.partition + 1)
- numPartitions = task.partition + 1;
- }
-
- boolean topicNotReady = true;
-
- while (topicNotReady) {
- Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
-
- // if topic does not exist, create it
- if (topicMetadata == null) {
- try {
- createTopic(topic, numPartitions);
- } catch (ZkNodeExistsException e) {
- // ignore and continue
- }
- } else {
- if (topicMetadata.size() > numPartitions) {
- // else if topic exists with more #.partitions than needed, delete in order to re-create it
- try {
- deleteTopic(topic);
- } catch (ZkNodeExistsException e) {
- // ignore and continue
- }
- } else if (topicMetadata.size() < numPartitions) {
- // else if topic exists with less #.partitions than needed, add partitions
- try {
- addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
- } catch (ZkNoNodeException e) {
- // ignore and continue
- }
- }
-
- topicNotReady = false;
- }
- }
-
- // wait until the topic metadata has been propagated to all brokers
- List<PartitionInfo> partitions;
- do {
- partitions = streamThread.restoreConsumer.partitionsFor(topic);
- } while (partitions == null || partitions.size() != numPartitions);
- }
-
- log.info("Completed validating changelog topics in partition assignor.");
- }
-
- return assignment;
- }
-
- @Override
- public void onAssignment(Assignment assignment) {
- List<TopicPartition> partitions = assignment.partitions();
-
- AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
- this.standbyTasks = info.standbyTasks;
-
- Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
- Iterator<TaskId> iter = info.activeTasks.iterator();
- for (TopicPartition partition : partitions) {
- Set<TaskId> taskIds = partitionToTaskIds.get(partition);
- if (taskIds == null) {
- taskIds = new HashSet<>();
- partitionToTaskIds.put(partition, taskIds);
- }
-
- if (iter.hasNext()) {
- taskIds.add(iter.next());
- } else {
- TaskAssignmentException ex = new TaskAssignmentException(
- "failed to find a task id for the partition=" + partition.toString() +
- ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
- );
- log.error(ex.getMessage(), ex);
- throw ex;
- }
- }
- this.partitionToTaskIds = partitionToTaskIds;
- }
-
- /* For Test Only */
- public Set<TaskId> tasksForState(String stateName) {
- return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
- }
-
- public Set<TaskId> tasksForPartition(TopicPartition partition) {
- return partitionToTaskIds.get(partition);
- }
-
- public Map<TaskId, Set<TopicPartition>> standbyTasks() {
- return standbyTasks;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3429df3..102c534 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -38,7 +38,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
private final TaskId id;
private final StreamTask task;
- private final StreamingMetrics metrics;
+ private final StreamsMetrics metrics;
private final RecordCollector collector;
private final ProcessorStateManager stateMgr;
@@ -52,10 +52,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@SuppressWarnings("unchecked")
public ProcessorContextImpl(TaskId id,
StreamTask task,
- StreamingConfig config,
+ StreamsConfig config,
RecordCollector collector,
ProcessorStateManager stateMgr,
- StreamingMetrics metrics) {
+ StreamsMetrics metrics) {
this.id = id;
this.task = task;
this.metrics = metrics;
@@ -113,7 +113,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
}
@Override
- public StreamingMetrics metrics() {
+ public StreamsMetrics metrics() {
return metrics;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index fe0472e..25c663d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +72,7 @@ public class RecordCollector {
}
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
- StreamPartitioner<K, V> partitioner) {
+ StreamsPartitioner<K, V> partitioner) {
byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
Integer partition = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 7ab59ee..88b3f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
public class SinkNode<K, V> extends ProcessorNode<K, V> {
private final String topic;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
- private final StreamPartitioner<K, V> partitioner;
+ private final StreamsPartitioner<K, V> partitioner;
private ProcessorContext context;
- public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
+ public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) {
super(name);
this.topic = topic;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index e0583e3..89d185c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -36,7 +36,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
private final TaskId id;
- private final StreamingMetrics metrics;
+ private final StreamsMetrics metrics;
private final ProcessorStateManager stateMgr;
private final Serializer<?> keySerializer;
@@ -47,9 +47,9 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
private boolean initialized;
public StandbyContextImpl(TaskId id,
- StreamingConfig config,
+ StreamsConfig config,
ProcessorStateManager stateMgr,
- StreamingMetrics metrics) {
+ StreamsMetrics metrics) {
this.id = id;
this.metrics = metrics;
this.stateMgr = stateMgr;
@@ -105,7 +105,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
}
@Override
- public StreamingMetrics metrics() {
+ public StreamsMetrics metrics() {
return metrics;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4cc4ea4..861b830 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -21,8 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +50,8 @@ public class StandbyTask extends AbstractTask {
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
- * @param config the {@link StreamingConfig} specified by the user
- * @param metrics the {@link StreamingMetrics} created by the thread
+ * @param config the {@link StreamsConfig} specified by the user
+ * @param metrics the {@link StreamsMetrics} created by the thread
*/
public StandbyTask(TaskId id,
String jobId,
@@ -59,8 +59,8 @@ public class StandbyTask extends AbstractTask {
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
- StreamingConfig config,
- StreamingMetrics metrics) {
+ StreamsConfig config,
+ StreamsMetrics metrics) {
super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
new file mode 100644
index 0000000..5d87e5a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.ClientState;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.ZooDefs;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.I0Itec.zkclient.ZkClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
+
+ private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+
+ private StreamThread streamThread;
+
+ private int numStandbyReplicas;
+ private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
+ private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
+ private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
+ private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
+ private Map<TaskId, Set<TopicPartition>> standbyTasks;
+
+ // TODO: the following ZK dependency should be removed after KIP-4
+ private static final String ZK_TOPIC_PATH = "/brokers/topics";
+ private static final String ZK_BROKER_PATH = "/brokers/ids";
+ private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+ private ZkClient zkClient;
+
+ private class ZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) {
+ try {
+ return ((String) data).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ try {
+ if (bytes == null)
+ return null;
+ else
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ private List<Integer> getBrokers() {
+ List<Integer> brokers = new ArrayList<>();
+ for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+ brokers.add(Integer.parseInt(broker));
+ }
+ Collections.sort(brokers);
+
+ log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+ return brokers;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+ String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+ if (data == null) return null;
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+
+ Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+ });
+
+ Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+ log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+ return partitions;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
+ log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+ // we always assign leaders to brokers starting at the first one with replication factor 1
+ List<Integer> brokers = getBrokers();
+
+ Map<Integer, List<Integer>> assignment = new HashMap<>();
+ for (int i = 0; i < numPartitions; i++) {
+ assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", assignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ } catch (JsonProcessingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ private void deleteTopic(String topic) throws ZkNodeExistsException {
+ log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+ zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
+ log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+ // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
+ List<Integer> brokers = getBrokers();
+
+ int startIndex = existingAssignment.size();
+
+ Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+ for (int i = 0; i < numPartitions; i++) {
+ newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", newAssignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+ } catch (JsonProcessingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ /**
+ * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
+ * since the former needs later's cached metadata while sending subscriptions,
+ * and the latter needs former's returned assignment when adding tasks.
+ */
+ @Override
+ public void configure(Map<String, ?> configs) {
+ numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+ Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
+ if (o == null) {
+ KafkaException ex = new KafkaException("StreamThread is not specified");
+ log.error(ex.getMessage(), ex);
+ throw ex;
+ }
+
+ if (!(o instanceof StreamThread)) {
+ KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
+ log.error(ex.getMessage(), ex);
+ throw ex;
+ }
+
+ streamThread = (StreamThread) o;
+ streamThread.partitionAssignor(this);
+
+ this.topicGroups = streamThread.builder.topicGroups();
+
+ if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
+ zkClient = new ZkClient((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
+ }
+
+ @Override
+ public String name() {
+ return "stream";
+ }
+
+ @Override
+ public Subscription subscription(Set<String> topics) {
+ // Adds the following information to subscription
+ // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
+ // 2. Task ids of previously running tasks
+ // 3. Task ids of valid local states on the client's state directory.
+
+ Set<TaskId> prevTasks = streamThread.prevTasks();
+ Set<TaskId> standbyTasks = streamThread.cachedTasks();
+ standbyTasks.removeAll(prevTasks);
+ SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
+
+ return new Subscription(new ArrayList<>(topics), data.encode());
+ }
+
+ @Override
+ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+ // This assigns tasks to consumer clients in two steps.
+ // 1. using TaskAssignor to assign tasks to consumer clients.
+ // - Assign a task to a client which was running it previously.
+ // If there is no such client, assign a task to a client which has its valid local state.
+ // - A client may have more than one stream threads.
+ // The assignor tries to assign tasks to a client proportionally to the number of threads.
+ // - We try not to assign the same set of tasks to two different clients
+ // We do the assignment in one-pass. The result may not satisfy above all.
+ // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
+ Map<UUID, Set<String>> consumersByClient = new HashMap<>();
+ Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+
+ // Decode subscription info
+ for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
+ String consumerId = entry.getKey();
+ Subscription subscription = entry.getValue();
+
+ SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+
+ Set<String> consumers = consumersByClient.get(info.processId);
+ if (consumers == null) {
+ consumers = new HashSet<>();
+ consumersByClient.put(info.processId, consumers);
+ }
+ consumers.add(consumerId);
+
+ ClientState<TaskId> state = states.get(info.processId);
+ if (state == null) {
+ state = new ClientState<>();
+ states.put(info.processId, state);
+ }
+
+ state.prevActiveTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.standbyTasks);
+ state.capacity = state.capacity + 1d;
+ }
+
+ // get the tasks as partition groups from the partition grouper
+ Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
+ }
+ Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
+
+ // add tasks to state topic subscribers
+ stateChangelogTopicToTaskIds = new HashMap<>();
+ internalSourceTopicToTaskIds = new HashMap<>();
+ for (TaskId task : partitionsForTask.keySet()) {
+ for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
+ Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
+ if (tasks == null) {
+ tasks = new HashSet<>();
+ stateChangelogTopicToTaskIds.put(stateName, tasks);
+ }
+
+ tasks.add(task);
+ }
+
+ for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
+ Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
+ if (tasks == null) {
+ tasks = new HashSet<>();
+ internalSourceTopicToTaskIds.put(topicName, tasks);
+ }
+
+ tasks.add(task);
+ }
+ }
+
+ // assign tasks to clients
+ states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+ Map<String, Assignment> assignment = new HashMap<>();
+
+ for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
+ UUID processId = entry.getKey();
+ Set<String> consumers = entry.getValue();
+ ClientState<TaskId> state = states.get(processId);
+
+ ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
+ final int numActiveTasks = state.activeTasks.size();
+ for (TaskId taskId : state.activeTasks) {
+ taskIds.add(taskId);
+ }
+ for (TaskId id : state.assignedTasks) {
+ if (!state.activeTasks.contains(id))
+ taskIds.add(id);
+ }
+
+ final int numConsumers = consumers.size();
+ List<TaskId> active = new ArrayList<>();
+ Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
+
+ int i = 0;
+ for (String consumer : consumers) {
+ List<TopicPartition> activePartitions = new ArrayList<>();
+
+ final int numTaskIds = taskIds.size();
+ for (int j = i; j < numTaskIds; j += numConsumers) {
+ TaskId taskId = taskIds.get(j);
+ if (j < numActiveTasks) {
+ for (TopicPartition partition : partitionsForTask.get(taskId)) {
+ activePartitions.add(partition);
+ active.add(taskId);
+ }
+ } else {
+ Set<TopicPartition> standbyPartitions = standby.get(taskId);
+ if (standbyPartitions == null) {
+ standbyPartitions = new HashSet<>();
+ standby.put(taskId, standbyPartitions);
+ }
+ standbyPartitions.addAll(partitionsForTask.get(taskId));
+ }
+ }
+
+ AssignmentInfo data = new AssignmentInfo(active, standby);
+ assignment.put(consumer, new Assignment(activePartitions, data.encode()));
+ i++;
+
+ active.clear();
+ standby.clear();
+ }
+ }
+
+ // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
+ if (zkClient != null) {
+ log.debug("Starting to validate changelog topics in partition assignor.");
+
+ Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
+ topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
+ topicToTaskIds.putAll(internalSourceTopicToTaskIds);
+
+ for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
+ String topic = streamThread.jobId + "-" + entry.getKey();
+
+ // the expected number of partitions is the max value of TaskId.partition + 1
+ int numPartitions = 0;
+ for (TaskId task : entry.getValue()) {
+ if (numPartitions < task.partition + 1)
+ numPartitions = task.partition + 1;
+ }
+
+ boolean topicNotReady = true;
+
+ while (topicNotReady) {
+ Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+ // if topic does not exist, create it
+ if (topicMetadata == null) {
+ try {
+ createTopic(topic, numPartitions);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else {
+ if (topicMetadata.size() > numPartitions) {
+ // else if topic exists with more #.partitions than needed, delete in order to re-create it
+ try {
+ deleteTopic(topic);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else if (topicMetadata.size() < numPartitions) {
+ // else if topic exists with less #.partitions than needed, add partitions
+ try {
+ addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
+ } catch (ZkNoNodeException e) {
+ // ignore and continue
+ }
+ }
+
+ topicNotReady = false;
+ }
+ }
+
+ // wait until the topic metadata has been propagated to all brokers
+ List<PartitionInfo> partitions;
+ do {
+ partitions = streamThread.restoreConsumer.partitionsFor(topic);
+ } while (partitions == null || partitions.size() != numPartitions);
+ }
+
+ log.info("Completed validating changelog topics in partition assignor.");
+ }
+
+ return assignment;
+ }
+
+ @Override
+ public void onAssignment(Assignment assignment) {
+ List<TopicPartition> partitions = assignment.partitions();
+
+ AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+ this.standbyTasks = info.standbyTasks;
+
+ Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
+ Iterator<TaskId> iter = info.activeTasks.iterator();
+ for (TopicPartition partition : partitions) {
+ Set<TaskId> taskIds = partitionToTaskIds.get(partition);
+ if (taskIds == null) {
+ taskIds = new HashSet<>();
+ partitionToTaskIds.put(partition, taskIds);
+ }
+
+ if (iter.hasNext()) {
+ taskIds.add(iter.next());
+ } else {
+ TaskAssignmentException ex = new TaskAssignmentException(
+ "failed to find a task id for the partition=" + partition.toString() +
+ ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
+ );
+ log.error(ex.getMessage(), ex);
+ throw ex;
+ }
+ }
+ this.partitionToTaskIds = partitionToTaskIds;
+ }
+
+ /* For Test Only */
+ public Set<TaskId> tasksForState(String stateName) {
+ return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+ }
+
+ public Set<TaskId> tasksForPartition(TopicPartition partition) {
+ return partitionToTaskIds.get(partition);
+ }
+
+ public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ return standbyTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2e58ad5..ec3d011 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
@@ -67,8 +67,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
* @param consumer the instance of {@link Consumer}
* @param producer the instance of {@link Producer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
- * @param config the {@link StreamingConfig} specified by the user
- * @param metrics the {@link StreamingMetrics} created by the thread
+ * @param config the {@link StreamsConfig} specified by the user
+ * @param metrics the {@link StreamsMetrics} created by the thread
*/
public StreamTask(TaskId id,
String jobId,
@@ -77,11 +77,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
- StreamingConfig config,
- StreamingMetrics metrics) {
+ StreamsConfig config,
+ StreamsMetrics metrics) {
super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
this.punctuationQueue = new PunctuationQueue();
- this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+ this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
// create queues for each assigned partition and associate them
// to corresponding source nodes in the processor topology
@@ -93,7 +93,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
partitionQueues.put(partition, queue);
}
- TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+ TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
// initialize the consumed offset cache
[2/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to
KafkaStreams
Posted by gw...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 578357a..e5d0922 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -39,8 +39,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -67,14 +67,14 @@ import java.util.concurrent.atomic.AtomicInteger;
public class StreamThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
- private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
+ private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
public final PartitionGrouper partitionGrouper;
public final String jobId;
public final String clientId;
public final UUID processId;
- protected final StreamingConfig config;
+ protected final StreamsConfig config;
protected final TopologyBuilder builder;
protected final Set<String> sourceTopics;
protected final Producer<byte[], byte[]> producer;
@@ -93,9 +93,9 @@ public class StreamThread extends Thread {
private final long cleanTimeMs;
private final long commitTimeMs;
private final long totalRecordsToProcess;
- private final StreamingMetricsImpl sensors;
+ private final StreamsMetricsImpl sensors;
- private KafkaStreamingPartitionAssignor partitionAssignor = null;
+ private StreamPartitionAssignor partitionAssignor = null;
private long lastClean;
private long lastCommit;
@@ -122,17 +122,17 @@ public class StreamThread extends Thread {
};
public StreamThread(TopologyBuilder builder,
- StreamingConfig config,
+ StreamsConfig config,
String jobId,
String clientId,
UUID processId,
Metrics metrics,
- Time time) throws Exception {
+ Time time) {
this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
}
StreamThread(TopologyBuilder builder,
- StreamingConfig config,
+ StreamsConfig config,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
@@ -140,8 +140,8 @@ public class StreamThread extends Thread {
String clientId,
UUID processId,
Metrics metrics,
- Time time) throws Exception {
- super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
+ Time time) {
+ super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
this.jobId = jobId;
this.config = config;
@@ -149,7 +149,7 @@ public class StreamThread extends Thread {
this.sourceTopics = builder.sourceTopics();
this.clientId = clientId;
this.processId = processId;
- this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+ this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
// set the producer and consumer clients
this.producer = (producer != null) ? producer : createProducer();
@@ -167,24 +167,24 @@ public class StreamThread extends Thread {
this.standbyRecords = new HashMap<>();
// read in task specific config values
- this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
+ this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
this.stateDir.mkdir();
- this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG);
- this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG);
- this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
- this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS);
+ this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+ this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+ this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+ this.totalRecordsToProcess = config.getLong(StreamsConfig.TOTAL_RECORDS_TO_PROCESS);
this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
this.lastCommit = time.milliseconds();
this.recordsProcessed = 0;
this.time = time;
- this.sensors = new StreamingMetricsImpl(metrics);
+ this.sensors = new StreamsMetricsImpl(metrics);
this.running = new AtomicBoolean(true);
}
- public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
+ public void partitionAssignor(StreamPartitionAssignor partitionAssignor) {
this.partitionAssignor = partitionAssignor;
}
@@ -227,7 +227,7 @@ public class StreamThread extends Thread {
}
/**
- * Shutdown this streaming thread.
+ * Shutdown this stream thread.
*/
public void close() {
running.set(false);
@@ -673,7 +673,7 @@ public class StreamThread extends Thread {
}
}
- private class StreamingMetricsImpl implements StreamingMetrics {
+ private class StreamsMetricsImpl implements StreamsMetrics {
final Metrics metrics;
final String metricGrpName;
final Map<String, String> metricTags;
@@ -685,10 +685,10 @@ public class StreamThread extends Thread {
final Sensor taskCreationSensor;
final Sensor taskDestructionSensor;
- public StreamingMetricsImpl(Metrics metrics) {
+ public StreamsMetricsImpl(Metrics metrics) {
this.metrics = metrics;
- this.metricGrpName = "streaming-metrics";
+ this.metricGrpName = "stream-metrics";
this.metricTags = new LinkedHashMap<>();
this.metricTags.put("client-id", clientId + "-" + getName());
@@ -734,7 +734,7 @@ public class StreamThread extends Thread {
for (int i = 0; i < tags.length; i += 2)
tagMap.put(tags[i], tags[i + 1]);
- String metricGroupName = "streaming-" + scopeName + "-metrics";
+ String metricGroupName = "stream-" + scopeName + "-metrics";
// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(scopeName + "-" + operationName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
deleted file mode 100644
index 183b691..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-public class Entry<K, V> {
-
- private final K key;
- private final V value;
-
- public Entry(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- public K key() {
- return key;
- }
-
- public V value() {
- return value;
- }
-
- public String toString() {
- return "Entry(" + key() + ", " + value() + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index 0fbd4ae..bd118a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -19,10 +19,12 @@
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.KeyValue;
+
import java.io.Closeable;
import java.util.Iterator;
-public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable {
+public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closeable {
@Override
public void close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index e4faed1..d448044 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -19,6 +19,7 @@
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StateStore;
import java.util.List;
@@ -55,7 +56,7 @@ public interface KeyValueStore<K, V> extends StateStore {
* @param entries A list of entries to put into the store.
* @throws NullPointerException If null is used for any key or value.
*/
- abstract public void putAll(List<Entry<K, V>> entries);
+ abstract public void putAll(List<KeyValue<K, V>> entries);
/**
* Delete the value from the store (if there is one)
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index 55d1ac3..08cd049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -19,7 +19,7 @@
package org.apache.kafka.streams.state;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 286db1b..4856b09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -18,10 +18,10 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
@@ -97,9 +97,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
+ public void putAll(List<KeyValue<K, V>> entries) {
+ for (KeyValue<K, V> entry : entries)
+ put(entry.key, entry.value);
}
@Override
@@ -140,9 +140,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
- public Entry<K, V> next() {
+ public KeyValue<K, V> next() {
Map.Entry<K, V> entry = iter.next();
- return new Entry<>(entry.getKey(), entry.getValue());
+ return new KeyValue<>(entry.getKey(), entry.getValue());
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 6a38423..22ee3f7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -17,10 +17,10 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
@@ -131,9 +131,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
+ public void putAll(List<KeyValue<K, V>> entries) {
+ for (KeyValue<K, V> entry : entries)
+ put(entry.key, entry.value);
}
@Override
@@ -179,9 +179,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
- public Entry<K, V> next() {
+ public KeyValue<K, V> next() {
lastKey = keys.next();
- return new Entry<>(lastKey, entries.get(lastKey));
+ return new KeyValue<>(lastKey, entries.get(lastKey));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 21f73b0..d5fe44a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -18,13 +18,13 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
@@ -47,7 +47,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private Sensor rangeTime;
private Sensor flushTime;
private Sensor restoreTime;
- private StreamingMetrics metrics;
+ private StreamsMetrics metrics;
private boolean loggingEnabled = true;
private StoreChangeLogger<K, V> changeLogger = null;
@@ -141,14 +141,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public void putAll(List<Entry<K, V>> entries) {
+ public void putAll(List<KeyValue<K, V>> entries) {
long startNs = time.nanoseconds();
try {
this.inner.putAll(entries);
if (loggingEnabled) {
- for (Entry<K, V> entry : entries) {
- K key = entry.key();
+ for (KeyValue<K, V> entry : entries) {
+ K key = entry.key;
changeLogger.add(key);
}
changeLogger.maybeLogChange(this.getter);
@@ -231,7 +231,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public Entry<K1, V1> next() {
+ public KeyValue<K1, V1> next() {
return iter.next();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 821927d..862c322 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.state.Serdes;
@@ -40,7 +40,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
private Sensor rangeTime;
private Sensor flushTime;
private Sensor restoreTime;
- private StreamingMetrics metrics;
+ private StreamsMetrics metrics;
private boolean loggingEnabled = true;
private StoreChangeLogger<byte[], byte[]> changeLogger = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8a600f9..6c77ab2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -18,8 +18,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
@@ -147,9 +147,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
+ public void putAll(List<KeyValue<K, V>> entries) {
+ for (KeyValue<K, V> entry : entries)
+ put(entry.key, entry.value);
}
@Override
@@ -200,8 +200,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return iter.key();
}
- protected Entry<K, V> getEntry() {
- return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+ protected KeyValue<K, V> getKeyValue() {
+ return new KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
}
@Override
@@ -210,11 +210,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public Entry<K, V> next() {
+ public KeyValue<K, V> next() {
if (!hasNext())
throw new NoSuchElementException();
- Entry<K, V> entry = this.getEntry();
+ KeyValue<K, V> entry = this.getKeyValue();
iter.next();
return entry;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 933ed91..d854c92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,9 +20,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.WindowStore;
@@ -86,10 +85,10 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
if (index >= iterators.length)
throw new NoSuchElementException();
- Entry<byte[], byte[]> entry = iterators[index].next();
+ KeyValue<byte[], byte[]> kv = iterators[index].next();
- return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
- serdes.valueFrom(entry.value()));
+ return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(kv.key),
+ serdes.valueFrom(kv.value));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
deleted file mode 100644
index 3b3fc9b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Map;
-import java.util.Properties;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-
-
-public class StreamingConfigTest {
-
- private Properties props = new Properties();
- private StreamingConfig streamingConfig;
- private StreamThread streamThreadPlaceHolder;
-
-
- @Before
- public void setUp() {
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- streamingConfig = new StreamingConfig(props);
- }
-
- @Test
- public void testGetProducerConfigs() throws Exception {
- Map<String, Object> returnedProps = streamingConfig.getProducerConfigs("client");
- assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
- }
-
- @Test
- public void testGetConsumerConfigs() throws Exception {
- Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
- assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
- assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
-
- }
-
- @Test
- public void testGetRestoreConsumerConfigs() throws Exception {
- Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs("client");
- assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
- assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
new file mode 100644
index 0000000..777fae5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Properties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+
+public class StreamsConfigTest {
+
+ private Properties props = new Properties();
+ private StreamsConfig streamsConfig;
+ private StreamThread streamThreadPlaceHolder;
+
+
+ @Before
+ public void setUp() {
+ props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ streamsConfig = new StreamsConfig(props);
+ }
+
+ @Test
+ public void testGetProducerConfigs() throws Exception {
+ Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
+ assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer");
+ }
+
+ @Test
+ public void testGetConsumerConfigs() throws Exception {
+ Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
+ assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
+ assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
+
+ }
+
+ @Test
+ public void testGetRestoreConsumerConfigs() throws Exception {
+ Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
+ assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
+ assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index a55fd30..693f58e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 880adce..f226cee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 0f7cb6a..73c517b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 0b7b1e7..426259f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index aa09e74..12bfb9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 1527f17..e3cf22b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 67b83f5..feabc08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
deleted file mode 100644
index 1b8cbb8..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowedStreamPartitionerTest {
-
- private String topicName = "topic";
-
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
-
- private List<PartitionInfo> infos = Arrays.asList(
- new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
- );
-
- private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
- @Test
- public void testCopartitioning() {
-
- Random rand = new Random();
-
- DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
-
- WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
- WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
-
- for (int k = 0; k < 10; k++) {
- Integer key = rand.nextInt();
- byte[] keyBytes = keySerializer.serialize(topicName, key);
-
- String value = key.toString();
- byte[] valueBytes = valSerializer.serialize(topicName, value);
-
- Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
-
- for (int w = 0; w < 10; w++) {
- HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
-
- Windowed<Integer> windowedKey = new Windowed<>(key, window);
- Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
-
- assertEquals(expected, actual);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
new file mode 100644
index 0000000..18494fd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStreamsPartitionerTest {
+
+ private String topicName = "topic";
+
+ private IntegerSerializer keySerializer = new IntegerSerializer();
+ private StringSerializer valSerializer = new StringSerializer();
+
+ private List<PartitionInfo> infos = Arrays.asList(
+ new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
+ new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
+ );
+
+ private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+ @Test
+ public void testCopartitioning() {
+
+ Random rand = new Random();
+
+ DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+
+ WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+ WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer);
+
+ for (int k = 0; k < 10; k++) {
+ Integer key = rand.nextInt();
+ byte[] keyBytes = keySerializer.serialize(topicName, key);
+
+ String value = key.toString();
+ byte[] valueBytes = valSerializer.serialize(topicName, value);
+
+ Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
+
+ for (int w = 0; w < 10; w++) {
+ HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+
+ Windowed<Integer> windowedKey = new Windowed<>(key, window);
+ Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
+
+ assertEquals(expected, actual);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
deleted file mode 100644
index 92d7b6a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-
-public class KafkaStreamingPartitionAssignorTest {
-
- private TopicPartition t1p0 = new TopicPartition("topic1", 0);
- private TopicPartition t1p1 = new TopicPartition("topic1", 1);
- private TopicPartition t1p2 = new TopicPartition("topic1", 2);
- private TopicPartition t2p0 = new TopicPartition("topic2", 0);
- private TopicPartition t2p1 = new TopicPartition("topic2", 1);
- private TopicPartition t2p2 = new TopicPartition("topic2", 2);
- private TopicPartition t3p0 = new TopicPartition("topic3", 0);
- private TopicPartition t3p1 = new TopicPartition("topic3", 1);
- private TopicPartition t3p2 = new TopicPartition("topic3", 2);
- private TopicPartition t3p3 = new TopicPartition("topic3", 3);
-
- private Set<String> allTopics = Utils.mkSet("topic1", "topic2");
-
- private List<PartitionInfo> infos = Arrays.asList(
- new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
- new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
- );
-
- private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
- private final TaskId task0 = new TaskId(0, 0);
- private final TaskId task1 = new TaskId(0, 1);
- private final TaskId task2 = new TaskId(0, 2);
- private final TaskId task3 = new TaskId(0, 3);
-
- private Properties configProps() {
- return new Properties() {
- {
- setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
- setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
- }
- };
- }
-
- private ByteArraySerializer serializer = new ByteArraySerializer();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testSubscription() throws Exception {
- StreamingConfig config = new StreamingConfig(configProps());
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-
- final Set<TaskId> prevTasks = Utils.mkSet(
- new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
- final Set<TaskId> cachedTasks = Utils.mkSet(
- new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
- new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
-
- String clientId = "client-id";
- UUID processId = UUID.randomUUID();
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) {
- @Override
- public Set<TaskId> prevTasks() {
- return prevTasks;
- }
- @Override
- public Set<TaskId> cachedTasks() {
- return cachedTasks;
- }
- };
-
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
-
- PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
-
- Collections.sort(subscription.topics());
- assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
-
- Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
- standbyTasks.removeAll(prevTasks);
-
- SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
- assertEquals(info.encode(), subscription.userData());
- }
-
- @Test
- public void testAssignBasic() throws Exception {
- StreamingConfig config = new StreamingConfig(configProps());
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
- List<String> topics = Utils.mkList("topic1", "topic2");
- Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
-
- final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
- final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
- final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
- final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
-
- UUID uuid1 = UUID.randomUUID();
- UUID uuid2 = UUID.randomUUID();
- String client1 = "client1";
- String client2 = "client2";
-
- StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
- Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
- subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
- subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
- subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
-
- Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
- // check assigned partitions
- assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
- Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
- assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
-
- // check assignment info
-
- Set<TaskId> allActiveTasks = new HashSet<>();
-
- // the first consumer
- AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
-
- // the second consumer
- AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
- allActiveTasks.addAll(info11.activeTasks);
-
- assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
-
- // the third consumer
- AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
- allActiveTasks.addAll(info20.activeTasks);
-
- assertEquals(3, allActiveTasks.size());
- assertEquals(allTasks, new HashSet<>(allActiveTasks));
-
- assertEquals(3, allActiveTasks.size());
- assertEquals(allTasks, allActiveTasks);
- }
-
- @Test
- public void testAssignWithNewTasks() throws Exception {
- StreamingConfig config = new StreamingConfig(configProps());
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addSource("source3", "topic3");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
- List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
- Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
-
- // assuming that previous tasks do not have topic3
- final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
- final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
- final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
-
- UUID uuid1 = UUID.randomUUID();
- UUID uuid2 = UUID.randomUUID();
- String client1 = "client1";
- String client2 = "client2";
-
- StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
- Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
- subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
- subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
- subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
-
- Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
- // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
- // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
- // then later ones will be re-assigned to other hosts due to load balancing
- Set<TaskId> allActiveTasks = new HashSet<>();
- Set<TopicPartition> allPartitions = new HashSet<>();
- AssignmentInfo info;
-
- info = AssignmentInfo.decode(assignments.get("consumer10").userData());
- allActiveTasks.addAll(info.activeTasks);
- allPartitions.addAll(assignments.get("consumer10").partitions());
-
- info = AssignmentInfo.decode(assignments.get("consumer11").userData());
- allActiveTasks.addAll(info.activeTasks);
- allPartitions.addAll(assignments.get("consumer11").partitions());
-
- info = AssignmentInfo.decode(assignments.get("consumer20").userData());
- allActiveTasks.addAll(info.activeTasks);
- allPartitions.addAll(assignments.get("consumer20").partitions());
-
- assertEquals(allTasks, allActiveTasks);
- assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
- }
-
- @Test
- public void testAssignWithStates() throws Exception {
- StreamingConfig config = new StreamingConfig(configProps());
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
-
- builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
- builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");
-
- builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
- builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
- builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");
-
- List<String> topics = Utils.mkList("topic1", "topic2");
-
- TaskId task00 = new TaskId(0, 0);
- TaskId task01 = new TaskId(0, 1);
- TaskId task02 = new TaskId(0, 2);
- TaskId task10 = new TaskId(1, 0);
- TaskId task11 = new TaskId(1, 1);
- TaskId task12 = new TaskId(1, 2);
-
- UUID uuid1 = UUID.randomUUID();
- UUID uuid2 = UUID.randomUUID();
- String client1 = "client1";
- String client2 = "client2";
-
- StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
- Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
- subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
- subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
- subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
-
- Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
- // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
- assertEquals(2, assignments.get("consumer10").partitions().size());
- assertEquals(2, assignments.get("consumer11").partitions().size());
- assertEquals(2, assignments.get("consumer20").partitions().size());
-
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size());
- assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size());
-
- // check tasks for state topics
- assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1"));
- assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2"));
- assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3"));
- }
-
- @Test
- public void testAssignWithStandbyReplicas() throws Exception {
- Properties props = configProps();
- props.setProperty(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
- StreamingConfig config = new StreamingConfig(props);
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
- List<String> topics = Utils.mkList("topic1", "topic2");
- Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
-
-
- final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
- final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
- final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
- final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
- final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
-
- UUID uuid1 = UUID.randomUUID();
- UUID uuid2 = UUID.randomUUID();
- String client1 = "client1";
- String client2 = "client2";
-
- StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime());
-
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-
- Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
- subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
- subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
- subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
-
- Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
-
- Set<TaskId> allActiveTasks = new HashSet<>();
- Set<TaskId> allStandbyTasks = new HashSet<>();
-
- // the first consumer
- AssignmentInfo info10 = checkAssignment(assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
- allStandbyTasks.addAll(info10.standbyTasks.keySet());
-
- // the second consumer
- AssignmentInfo info11 = checkAssignment(assignments.get("consumer11"));
- allActiveTasks.addAll(info11.activeTasks);
- allStandbyTasks.addAll(info11.standbyTasks.keySet());
-
- // check active tasks assigned to the first client
- assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
- assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
-
- // the third consumer
- AssignmentInfo info20 = checkAssignment(assignments.get("consumer20"));
- allActiveTasks.addAll(info20.activeTasks);
- allStandbyTasks.addAll(info20.standbyTasks.keySet());
-
- // all task ids are in the active tasks and also in the standby tasks
-
- assertEquals(3, allActiveTasks.size());
- assertEquals(allTasks, allActiveTasks);
-
- assertEquals(3, allStandbyTasks.size());
- assertEquals(allTasks, allStandbyTasks);
- }
-
- private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) {
-
- // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.
-
- AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-
- // check if the number of assigned partitions == the size of active task id list
- assertEquals(assignment.partitions().size(), info.activeTasks.size());
-
- // check if active tasks are consistent
- List<TaskId> activeTasks = new ArrayList<>();
- Set<String> activeTopics = new HashSet<>();
- for (TopicPartition partition : assignment.partitions()) {
- // since default grouper, taskid.partition == partition.partition()
- activeTasks.add(new TaskId(0, partition.partition()));
- activeTopics.add(partition.topic());
- }
- assertEquals(activeTasks, info.activeTasks);
-
- // check if active partitions cover all topics
- assertEquals(allTopics, activeTopics);
-
- // check if standby tasks are consistent
- Set<String> standbyTopics = new HashSet<>();
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
- TaskId id = entry.getKey();
- Set<TopicPartition> partitions = entry.getValue();
- for (TopicPartition partition : partitions) {
- // since default grouper, taskid.partition == partition.partition()
- assertEquals(id.partition, partition.partition());
-
- standbyTopics.add(partition.topic());
- }
- }
-
- if (info.standbyTasks.size() > 0)
- // check if standby partitions cover all topics
- assertEquals(allTopics, standbyTopics);
-
- return info;
- }
-
- @Test
- public void testOnAssignment() throws Exception {
- StreamingConfig config = new StreamingConfig(configProps());
-
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
- MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
-
- TopicPartition t2p3 = new TopicPartition("topic2", 3);
-
- TopologyBuilder builder = new TopologyBuilder();
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-
- UUID uuid = UUID.randomUUID();
- String client1 = "client1";
-
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime());
-
- KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
-
- List<TaskId> activeTaskList = Utils.mkList(task0, task3);
- Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
- standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
- standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
-
- AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
- PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
- partitionAssignor.onAssignment(assignment);
-
- assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0));
- assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3));
- assertEquals(standbyTasks, partitionAssignor.standbyTasks());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index f2ef2ea..60bd309 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -27,12 +27,12 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -60,21 +60,22 @@ public class ProcessorTopologyTest {
private static long timestamp = 1000L;
private ProcessorTopologyTestDriver driver;
- private StreamingConfig config;
+ private StreamsConfig config;
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
File localState = StateUtils.tempDir();
Properties props = new Properties();
- props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- props.setProperty(StreamingConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
- props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- this.config = new StreamingConfig(props);
+ props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
+ props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+ props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ this.config = new StreamsConfig(props);
}
@After
@@ -193,8 +194,8 @@ public class ProcessorTopologyTest {
assertNull(driver.readOutput(topic));
}
- protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) {
- return new StreamPartitioner<K, V>() {
+ protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer partition) {
+ return new StreamsPartitioner<K, V>() {
@Override
public Integer partition(K key, V value, int numPartitions) {
return partition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 85a8a15..fd604b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -85,17 +85,18 @@ public class StandbyTaskTest {
)
);
- private StreamingConfig createConfig(final File baseDir) throws Exception {
- return new StreamingConfig(new Properties() {
+ private StreamsConfig createConfig(final File baseDir) throws Exception {
+ return new StreamsConfig(new Properties() {
{
- setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
- setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
- setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
- setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+ setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+ setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+ setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test");
+ setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+ setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
}
});
}
@@ -130,7 +131,7 @@ public class StandbyTaskTest {
public void testStorePartitions() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- StreamingConfig config = createConfig(baseDir);
+ StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
@@ -145,7 +146,7 @@ public class StandbyTaskTest {
public void testUpdateNonPersistentStore() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- StreamingConfig config = createConfig(baseDir);
+ StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -164,7 +165,7 @@ public class StandbyTaskTest {
public void testUpdate() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
- StreamingConfig config = createConfig(baseDir);
+ StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
@@ -227,7 +228,7 @@ public class StandbyTaskTest {
new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0])
));
- StreamingConfig config = createConfig(baseDir);
+ StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
[4/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to
KafkaStreams
Posted by gw...@apache.org.
KAFKA-3136: Rename KafkaStreaming to KafkaStreams
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Gwen Shapira
Closes #800 from guozhangwang/KRename
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21c6cfe5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21c6cfe5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21c6cfe5
Branch: refs/heads/trunk
Commit: 21c6cfe50dbe818a392c28f48ce8891f7f99aaf6
Parents: 91ba074
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jan 22 13:00:00 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Jan 22 13:00:00 2016 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/KafkaStreaming.java | 167 ------
.../org/apache/kafka/streams/KafkaStreams.java | 171 +++++++
.../java/org/apache/kafka/streams/KeyValue.java | 37 ++
.../apache/kafka/streams/StreamingConfig.java | 314 ------------
.../apache/kafka/streams/StreamingMetrics.java | 27 -
.../org/apache/kafka/streams/StreamsConfig.java | 303 +++++++++++
.../apache/kafka/streams/StreamsMetrics.java | 27 +
.../kafka/streams/examples/KStreamJob.java | 24 +-
.../kafka/streams/examples/ProcessorJob.java | 34 +-
.../apache/kafka/streams/kstream/KStream.java | 1 +
.../apache/kafka/streams/kstream/KTable.java | 1 +
.../apache/kafka/streams/kstream/KeyValue.java | 34 --
.../kstream/internals/KStreamAggregate.java | 2 +-
.../kstream/internals/KStreamFlatMap.java | 2 +-
.../streams/kstream/internals/KStreamImpl.java | 10 +-
.../kstream/internals/KStreamKStreamJoin.java | 2 +-
.../streams/kstream/internals/KStreamMap.java | 2 +-
.../kstream/internals/KStreamReduce.java | 2 +-
.../kstream/internals/KStreamTransform.java | 2 +-
.../streams/kstream/internals/KTableImpl.java | 2 +-
.../kstream/internals/KTableRepartitionMap.java | 2 +-
.../internals/WindowedStreamPartitioner.java | 52 --
.../internals/WindowedStreamsPartitioner.java | 52 ++
.../streams/processor/ProcessorContext.java | 6 +-
.../streams/processor/StreamPartitioner.java | 59 ---
.../streams/processor/StreamsPartitioner.java | 59 +++
.../streams/processor/TopologyBuilder.java | 76 +--
.../processor/internals/AbstractTask.java | 6 +-
.../KafkaStreamingPartitionAssignor.java | 483 ------------------
.../internals/ProcessorContextImpl.java | 12 +-
.../processor/internals/RecordCollector.java | 4 +-
.../streams/processor/internals/SinkNode.java | 6 +-
.../processor/internals/StandbyContextImpl.java | 12 +-
.../processor/internals/StandbyTask.java | 12 +-
.../internals/StreamPartitionAssignor.java | 483 ++++++++++++++++++
.../streams/processor/internals/StreamTask.java | 16 +-
.../processor/internals/StreamThread.java | 48 +-
.../org/apache/kafka/streams/state/Entry.java | 42 --
.../kafka/streams/state/KeyValueIterator.java | 4 +-
.../kafka/streams/state/KeyValueStore.java | 3 +-
.../streams/state/WindowStoreIterator.java | 2 +-
.../InMemoryKeyValueStoreSupplier.java | 12 +-
.../InMemoryLRUCacheStoreSupplier.java | 12 +-
.../state/internals/MeteredKeyValueStore.java | 14 +-
.../state/internals/MeteredWindowStore.java | 6 +-
.../streams/state/internals/RocksDBStore.java | 16 +-
.../state/internals/RocksDBWindowStore.java | 9 +-
.../kafka/streams/StreamingConfigTest.java | 75 ---
.../apache/kafka/streams/StreamsConfigTest.java | 76 +++
.../kstream/internals/KStreamFlatMapTest.java | 2 +-
.../internals/KStreamKTableLeftJoinTest.java | 2 +-
.../kstream/internals/KStreamMapTest.java | 2 +-
.../kstream/internals/KStreamTransformTest.java | 2 +-
.../kstream/internals/KTableKTableJoinTest.java | 2 +-
.../internals/KTableKTableLeftJoinTest.java | 2 +-
.../internals/KTableKTableOuterJoinTest.java | 2 +-
.../WindowedStreamPartitionerTest.java | 84 ---
.../WindowedStreamsPartitionerTest.java | 84 +++
.../KafkaStreamingPartitionAssignorTest.java | 508 ------------------
.../internals/ProcessorTopologyTest.java | 27 +-
.../processor/internals/StandbyTaskTest.java | 31 +-
.../internals/StreamPartitionAssignorTest.java | 509 +++++++++++++++++++
.../processor/internals/StreamTaskTest.java | 27 +-
.../processor/internals/StreamThreadTest.java | 37 +-
.../streams/state/KeyValueStoreTestDriver.java | 45 +-
.../internals/AbstractKeyValueStoreTest.java | 22 +-
.../state/internals/RocksDBWindowStoreTest.java | 38 +-
.../apache/kafka/test/KStreamTestDriver.java | 4 +-
.../apache/kafka/test/MockProcessorContext.java | 14 +-
.../apache/kafka/test/NoOpKeyValueMapper.java | 2 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 32 +-
71 files changed, 2133 insertions(+), 2168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
deleted file mode 100644
index 0d99739..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
- * sends output to zero or more output topics.
- * <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
- * <p>
- * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
- * started in the appropriate processes to balance processing load.
- * <p>
- * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
- * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
- * <p>
- * A simple example might look like this:
- * <pre>
- * Map<String, Object> props = new HashMap<>();
- * props.put("bootstrap.servers", "localhost:4242");
- * props.put("key.deserializer", StringDeserializer.class);
- * props.put("value.deserializer", StringDeserializer.class);
- * props.put("key.serializer", StringSerializer.class);
- * props.put("value.serializer", IntegerSerializer.class);
- * props.put("timestamp.extractor", MyTimestampExtractor.class);
- * StreamingConfig config = new StreamingConfig(props);
- *
- * KStreamBuilder builder = new KStreamBuilder();
- * builder.from("topic1").mapValue(value -> value.length()).to("topic2");
- *
- * KafkaStreaming streaming = new KafkaStreaming(builder, config);
- * streaming.start();
- * </pre>
- *
- */
-public class KafkaStreaming {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
- private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
- private static final String JMX_PREFIX = "kafka.streams";
-
- // container states
- private static final int CREATED = 0;
- private static final int RUNNING = 1;
- private static final int STOPPED = 2;
- private int state = CREATED;
-
- private final StreamThread[] threads;
-
- // processId is expected to be unique across JVMs and to be used
- // in userData of the subscription request to allow assignor be aware
- // of the co-location of stream thread's consumers. It is for internal
- // usage only and should not be exposed to users at all.
- private final UUID processId;
-
- public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
- // create the metrics
- Time time = new SystemTime();
-
- this.processId = UUID.randomUUID();
-
- String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG);
- if (jobId.length() <= 0)
- jobId = "kafka-streams";
-
- String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
- if (clientId.length() <= 0)
- clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
-
- List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
- reporters.add(new JmxReporter(JMX_PREFIX));
-
- MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
- .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
- TimeUnit.MILLISECONDS);
-
- Metrics metrics = new Metrics(metricConfig, reporters, time);
-
- this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
- for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
- }
- }
-
- /**
- * Start the stream process by starting all its threads
- */
- public synchronized void start() {
- log.debug("Starting Kafka Stream process");
-
- if (state == CREATED) {
- for (StreamThread thread : threads)
- thread.start();
-
- state = RUNNING;
-
- log.info("Started Kafka Stream process");
- } else {
- throw new IllegalStateException("This process was already started.");
- }
- }
-
- /**
- * Shutdown this stream process by signaling the threads to stop,
- * wait for them to join and clean up the process instance.
- */
- public synchronized void close() {
- log.debug("Stopping Kafka Stream process");
-
- if (state == RUNNING) {
- // signal the threads to stop and wait
- for (StreamThread thread : threads)
- thread.close();
-
- for (StreamThread thread : threads) {
- try {
- thread.join();
- } catch (InterruptedException ex) {
- Thread.interrupted();
- }
- }
-
- state = STOPPED;
-
- log.info("Stopped Kafka Stream process");
- } else {
- throw new IllegalStateException("This process has not started yet.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
new file mode 100644
index 0000000..071cef6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
+ * sends output to zero or more output topics.
+ * <p>
+ * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
+ * the transformation.
+ * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors. It will instantiate and
+ * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
+ * <p>
+ * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether in this same process, on other processes
+ * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
+ * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
+ * started in the appropriate processes to balance processing load.
+ * <p>
+ * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
+ * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
+ * <p>
+ * A simple example might look like this:
+ * <pre>
+ * Map<String, Object> props = new HashMap<>();
+ * props.put("bootstrap.servers", "localhost:4242");
+ * props.put("key.deserializer", StringDeserializer.class);
+ * props.put("value.deserializer", StringDeserializer.class);
+ * props.put("key.serializer", StringSerializer.class);
+ * props.put("value.serializer", IntegerSerializer.class);
+ * props.put("timestamp.extractor", MyTimestampExtractor.class);
+ * StreamsConfig config = new StreamsConfig(props);
+ *
+ * KStreamBuilder builder = new KStreamBuilder();
+ * builder.from("topic1").mapValue(value -> value.length()).to("topic2");
+ *
+ * KafkaStreams streams = new KafkaStreams(builder, config);
+ * streams.start();
+ * </pre>
+ *
+ */
+public class KafkaStreams {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
+ private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+ private static final String JMX_PREFIX = "kafka.streams";
+
+ // container states
+ private static final int CREATED = 0;
+ private static final int RUNNING = 1;
+ private static final int STOPPED = 2;
+ private int state = CREATED;
+
+ private final StreamThread[] threads;
+
+ // processId is expected to be unique across JVMs and to be used
+ // in userData of the subscription request to allow assignor be aware
+ // of the co-location of stream thread's consumers. It is for internal
+ // usage only and should not be exposed to users at all.
+ private final UUID processId;
+
+ public KafkaStreams(TopologyBuilder builder, Properties props) {
+ this(builder, new StreamsConfig(props));
+ }
+
+ public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
+ // create the metrics
+ Time time = new SystemTime();
+
+ this.processId = UUID.randomUUID();
+
+ // JobId is a required config and hence should always have value
+ String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG);
+
+ String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
+ if (clientId.length() <= 0)
+ clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+
+ List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ reporters.add(new JmxReporter(JMX_PREFIX));
+
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+
+ Metrics metrics = new Metrics(metricConfig, reporters, time);
+
+ this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+ for (int i = 0; i < this.threads.length; i++) {
+ this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
+ }
+ }
+
+ /**
+ * Start the stream process by starting all its threads
+ */
+ public synchronized void start() {
+ log.debug("Starting Kafka Stream process");
+
+ if (state == CREATED) {
+ for (StreamThread thread : threads)
+ thread.start();
+
+ state = RUNNING;
+
+ log.info("Started Kafka Stream process");
+ } else {
+ throw new IllegalStateException("This process was already started.");
+ }
+ }
+
+ /**
+ * Shutdown this stream process by signaling the threads to stop,
+ * wait for them to join and clean up the process instance.
+ */
+ public synchronized void close() {
+ log.debug("Stopping Kafka Stream process");
+
+ if (state == RUNNING) {
+ // signal the threads to stop and wait
+ for (StreamThread thread : threads)
+ thread.close();
+
+ for (StreamThread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ }
+ }
+
+ state = STOPPED;
+
+ log.info("Stopped Kafka Stream process");
+ } else {
+ throw new IllegalStateException("This process has not started yet.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
new file mode 100644
index 0000000..472e677
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+public class KeyValue<K, V> {
+
+ public final K key;
+ public final V value;
+
+ public KeyValue(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static <K, V> KeyValue<K, V> pair(K key, V value) {
+ return new KeyValue<>(key, value);
+ }
+
+ public String toString() {
+ return "KeyValue(" + key + ", " + value + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
deleted file mode 100644
index e89d030..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-
-import java.util.Map;
-
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
-public class StreamingConfig extends AbstractConfig {
-
- private static final ConfigDef CONFIG;
-
- /** <code>state.dir</code> */
- public static final String STATE_DIR_CONFIG = "state.dir";
- private static final String STATE_DIR_DOC = "Directory location for state store.";
-
- /** <code>zookeeper.connect<code/> */
- public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
- private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
-
- /** <code>commit.interval.ms</code> */
- public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
- private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
-
- /** <code>poll.ms</code> */
- public static final String POLL_MS_CONFIG = "poll.ms";
- private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
-
- /** <code>num.stream.threads</code> */
- public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
- private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
-
- /** <code>num.stream.threads</code> */
- public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
- private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
-
- /** <code>buffered.records.per.partition</code> */
- public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
- private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
-
- /** <code>state.cleanup.delay</code> */
- public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
- private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
-
- /** <code>total.records.to.process</code> */
- public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
- private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
-
- /** <code>window.time.ms</code> */
- public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
- private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
- + "with this frequency even if there is no message.";
-
- /** <code>timestamp.extractor</code> */
- public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
- private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
-
- /** <code>partition.grouper</code> */
- public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
- private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
-
- /** <code>job.id</code> */
- public static final String JOB_ID_CONFIG = "job.id";
- public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
-
- /** <code>key.serializer</code> */
- public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
- /** <code>value.serializer</code> */
- public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
- /** <code>key.deserializer</code> */
- public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
- /** <code>value.deserializer</code> */
- public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
- /** <code>metrics.sample.window.ms</code> */
- public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
-
- /** <code>metrics.num.samples</code> */
- public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
-
- /** <code>metric.reporters</code> */
- public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
-
- /** <code>bootstrap.servers</code> */
- public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-
- /** <code>client.id</code> */
- public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
-
- private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
-
- static {
- CONFIG = new ConfigDef().define(JOB_ID_CONFIG,
- Type.STRING,
- "",
- Importance.MEDIUM,
- StreamingConfig.JOB_ID_DOC)
- .define(CLIENT_ID_CONFIG,
- Type.STRING,
- "",
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_ID_DOC)
- .define(ZOOKEEPER_CONNECT_CONFIG,
- Type.STRING,
- "",
- Importance.HIGH,
- StreamingConfig.ZOOKEEPER_CONNECT_DOC)
- .define(STATE_DIR_CONFIG,
- Type.STRING,
- SYSTEM_TEMP_DIRECTORY,
- Importance.MEDIUM,
- STATE_DIR_DOC)
- .define(COMMIT_INTERVAL_MS_CONFIG,
- Type.LONG,
- 30000,
- Importance.HIGH,
- COMMIT_INTERVAL_MS_DOC)
- .define(POLL_MS_CONFIG,
- Type.LONG,
- 100,
- Importance.LOW,
- POLL_MS_DOC)
- .define(NUM_STREAM_THREADS_CONFIG,
- Type.INT,
- 1,
- Importance.LOW,
- NUM_STREAM_THREADS_DOC)
- .define(NUM_STANDBY_REPLICAS_CONFIG,
- Type.INT,
- 0,
- Importance.LOW,
- NUM_STANDBY_REPLICAS_DOC)
- .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
- Type.INT,
- 1000,
- Importance.LOW,
- BUFFERED_RECORDS_PER_PARTITION_DOC)
- .define(STATE_CLEANUP_DELAY_MS_CONFIG,
- Type.LONG,
- 60000,
- Importance.LOW,
- STATE_CLEANUP_DELAY_MS_DOC)
- .define(TOTAL_RECORDS_TO_PROCESS,
- Type.LONG,
- -1L,
- Importance.LOW,
- TOTAL_RECORDS_TO_DOC)
- .define(WINDOW_TIME_MS_CONFIG,
- Type.LONG,
- -1L,
- Importance.MEDIUM,
- WINDOW_TIME_MS_DOC)
- .define(KEY_SERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
- .define(VALUE_SERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
- .define(KEY_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
- .define(VALUE_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
- .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- TIMESTAMP_EXTRACTOR_CLASS_DOC)
- .define(PARTITION_GROUPER_CLASS_CONFIG,
- Type.CLASS,
- DefaultPartitionGrouper.class,
- Importance.HIGH,
- PARTITION_GROUPER_CLASS_DOC)
- .define(BOOTSTRAP_SERVERS_CONFIG,
- Type.STRING,
- Importance.HIGH,
- CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG,
- Type.LIST,
- "",
- Importance.LOW,
- CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
- .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
- Type.LONG,
- 30000,
- atLeast(0),
- Importance.LOW,
- CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG,
- Type.INT,
- 2,
- atLeast(1),
- Importance.LOW,
- CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
- }
-
- public static class InternalConfig {
- public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
- }
-
- public StreamingConfig(Map<?, ?> props) {
- super(CONFIG, props);
- }
-
- public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
- Map<String, Object> props = getBaseConsumerConfigs();
-
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
- props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
- props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
-
- props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
-
- return props;
- }
-
- public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
- Map<String, Object> props = getBaseConsumerConfigs();
-
- // no need to set group id for a restore consumer
- props.remove(ConsumerConfig.GROUP_ID_CONFIG);
-
- props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
-
- return props;
- }
-
- private Map<String, Object> getBaseConsumerConfigs() {
- Map<String, Object> props = this.originals();
-
- // set consumer default property values
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- // remove properties that are not required for consumers
- props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
- props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
- return props;
- }
-
- public Map<String, Object> getProducerConfigs(String clientId) {
- Map<String, Object> props = this.originals();
-
- // set producer default property values
- props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
-
- // remove properties that are not required for producers
- props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
- props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
-
- return props;
- }
-
- public Serializer keySerializer() {
- return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- }
-
- public Serializer valueSerializer() {
- return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- }
-
- public Deserializer keyDeserializer() {
- return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- }
-
- public Deserializer valueDeserializer() {
- return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- }
-
- public static void main(String[] args) {
- System.out.println(CONFIG.toHtmlTable());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
deleted file mode 100644
index ebf80b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.Sensor;
-
-public interface StreamingMetrics {
-
- Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
-
- void recordLatency(Sensor sensor, long startNs, long endNs);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
new file mode 100644
index 0000000..3843b1d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class StreamsConfig extends AbstractConfig {
+
+ private static final ConfigDef CONFIG;
+
+ /** <code>state.dir</code> */
+ public static final String STATE_DIR_CONFIG = "state.dir";
+ private static final String STATE_DIR_DOC = "Directory location for state store.";
+
+ /** <code>zookeeper.connect<code/> */
+ public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+ private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+
+ /** <code>commit.interval.ms</code> */
+ public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
+ private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
+
+ /** <code>poll.ms</code> */
+ public static final String POLL_MS_CONFIG = "poll.ms";
+ private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
+
+ /** <code>num.stream.threads</code> */
+ public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
+ private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+
+ /** <code>num.stream.threads</code> */
+ public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
+ private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+
+ /** <code>buffered.records.per.partition</code> */
+ public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
+ private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
+
+ /** <code>state.cleanup.delay</code> */
+ public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
+ private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
+
+ /** <code>total.records.to.process</code> */
+ public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
+ private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
+
+ /** <code>timestamp.extractor</code> */
+ public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
+ private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+
+ /** <code>partition.grouper</code> */
+ public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
+ private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
+
+ /** <code>job.id</code> */
+ public static final String JOB_ID_CONFIG = "job.id";
+ public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+
+ /** <code>key.serializer</code> */
+ public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+
+ /** <code>value.serializer</code> */
+ public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+ /** <code>key.deserializer</code> */
+ public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+
+ /** <code>value.deserializer</code> */
+ public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+ /** <code>metrics.sample.window.ms</code> */
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+
+ /** <code>metrics.num.samples</code> */
+ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+
+ /** <code>metric.reporters</code> */
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
+ /** <code>bootstrap.servers</code> */
+ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+ /** <code>client.id</code> */
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
+ private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+
+ static {
+ CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value
+ Type.STRING,
+ Importance.HIGH,
+ StreamsConfig.JOB_ID_DOC)
+ .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
+ Type.STRING,
+ Importance.HIGH,
+ CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ .define(CLIENT_ID_CONFIG,
+ Type.STRING,
+ "",
+ Importance.HIGH,
+ CommonClientConfigs.CLIENT_ID_DOC)
+ .define(ZOOKEEPER_CONNECT_CONFIG,
+ Type.STRING,
+ "",
+ Importance.HIGH,
+ StreamsConfig.ZOOKEEPER_CONNECT_DOC)
+ .define(STATE_DIR_CONFIG,
+ Type.STRING,
+ SYSTEM_TEMP_DIRECTORY,
+ Importance.HIGH,
+ STATE_DIR_DOC)
+ .define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
+ .define(VALUE_SERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
+ .define(KEY_DESERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
+ .define(VALUE_DESERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
+ .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.MEDIUM,
+ TIMESTAMP_EXTRACTOR_CLASS_DOC)
+ .define(PARTITION_GROUPER_CLASS_CONFIG,
+ Type.CLASS,
+ DefaultPartitionGrouper.class,
+ Importance.MEDIUM,
+ PARTITION_GROUPER_CLASS_DOC)
+ .define(COMMIT_INTERVAL_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ Importance.LOW,
+ COMMIT_INTERVAL_MS_DOC)
+ .define(POLL_MS_CONFIG,
+ Type.LONG,
+ 100,
+ Importance.LOW,
+ POLL_MS_DOC)
+ .define(NUM_STREAM_THREADS_CONFIG,
+ Type.INT,
+ 1,
+ Importance.LOW,
+ NUM_STREAM_THREADS_DOC)
+ .define(NUM_STANDBY_REPLICAS_CONFIG,
+ Type.INT,
+ 0,
+ Importance.LOW,
+ NUM_STANDBY_REPLICAS_DOC)
+ .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+ Type.INT,
+ 1000,
+ Importance.LOW,
+ BUFFERED_RECORDS_PER_PARTITION_DOC)
+ .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+ Type.LONG,
+ 60000,
+ Importance.LOW,
+ STATE_CLEANUP_DELAY_MS_DOC)
+ .define(TOTAL_RECORDS_TO_PROCESS,
+ Type.LONG,
+ -1L,
+ Importance.LOW,
+ TOTAL_RECORDS_TO_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG,
+ Type.LIST,
+ "",
+ Importance.LOW,
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ atLeast(0),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG,
+ Type.INT,
+ 2,
+ atLeast(1),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
+ }
+
+ public static class InternalConfig {
+ public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
+ }
+
+ public StreamsConfig(Map<?, ?> props) {
+ super(CONFIG, props);
+ }
+
+ public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
+ Map<String, Object> props = getBaseConsumerConfigs();
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
+
+ props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+
+ return props;
+ }
+
+ public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
+ Map<String, Object> props = getBaseConsumerConfigs();
+
+ // no need to set group id for a restore consumer
+ props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+
+ return props;
+ }
+
+ private Map<String, Object> getBaseConsumerConfigs() {
+ Map<String, Object> props = this.originals();
+
+ // set consumer default property values
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ // remove properties that are not required for consumers
+ props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+ props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+ return props;
+ }
+
+ public Map<String, Object> getProducerConfigs(String clientId) {
+ Map<String, Object> props = this.originals();
+
+ // set producer default property values
+ props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+ // remove properties that are not required for producers
+ props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
+
+ return props;
+ }
+
+ public Serializer keySerializer() {
+ return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Serializer valueSerializer() {
+ return getConfiguredInstance(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Deserializer keyDeserializer() {
+ return getConfiguredInstance(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
+ public Deserializer valueDeserializer() {
+ return getConfiguredInstance(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
+ public static void main(String[] args) {
+ System.out.println(CONFIG.toHtmlTable());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
new file mode 100644
index 0000000..a151392
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+public interface StreamsMetrics {
+
+ Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
+
+ void recordLatency(Sensor sensor, long startNs, long endNs);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 88a8955..a234395 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -20,11 +20,11 @@ package org.apache.kafka.streams.examples;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
@@ -34,14 +34,14 @@ public class KStreamJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
+ props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
@@ -78,7 +78,7 @@ public class KStreamJob {
streams[0].to("topic2");
streams[1].to("topic3");
- KafkaStreaming kstream = new KafkaStreaming(builder, config);
+ KafkaStreams kstream = new KafkaStreams(builder, config);
kstream.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 2d0b79f..e17c16b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -70,11 +70,11 @@ public class ProcessorJob {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
while (iter.hasNext()) {
- Entry<String, Integer> entry = iter.next();
+ KeyValue<String, Integer> entry = iter.next();
- System.out.println("[" + entry.key() + ", " + entry.value() + "]");
+ System.out.println("[" + entry.key + ", " + entry.value + "]");
- context.forward(entry.key(), entry.value());
+ context.forward(entry.key, entry.value);
}
iter.close();
@@ -90,15 +90,15 @@ public class ProcessorJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
+ props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = new TopologyBuilder();
@@ -109,7 +109,7 @@ public class ProcessorJob {
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
- KafkaStreaming streaming = new KafkaStreaming(builder, config);
- streaming.start();
+ KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index dfed661..26f04f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 87298d1..feb28ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
/**
* KTable is an abstraction of a change log stream.
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
deleted file mode 100644
index f633f6e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class KeyValue<K, V> {
-
- public final K key;
- public final V value;
-
- public KeyValue(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- public static <K, V> KeyValue<K, V> pair(K key, V value) {
- return new KeyValue<>(key, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 91bfa9e..26002f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index daef8b1..ff7f9ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ce89220..98e50c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
@@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
String name = topology.newName(SINK_NAME);
- StreamPartitioner<K, V> streamPartitioner = null;
+ StreamsPartitioner<K, V> streamsPartitioner = null;
if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
- streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+ streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer);
}
- topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
+ topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 01e3325..a4ac9b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 57f1431..a40449b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 7d6eb27..c484c7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index a9d8f97..4299c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8ee557c..d046090 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 12fcc17..499f721 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
deleted file mode 100644
index 10e69cc..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-
-public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
-
- private final WindowedSerializer<K> serializer;
-
- public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
- this.serializer = serializer;
- }
-
- /**
- * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
- * and the current number of partitions. The partition number id determined by the original key of the windowed key
- * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
- *
- * @param windowedKey the key of the message
- * @param value the value of the message
- * @param numPartitions the total number of partitions
- * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
- */
- public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
- byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
-
- // hash the keyBytes to choose a partition
- return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
-
- private static int toPositive(int number) {
- return number & 0x7fffffff;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
new file mode 100644
index 0000000..ff1fa2c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
+
+public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> {
+
+ private final WindowedSerializer<K> serializer;
+
+ public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value
+ * and the current number of partitions. The partition number id determined by the original key of the windowed key
+ * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
+ *
+ * @param windowedKey the key of the message
+ * @param value the value of the message
+ * @param numPartitions the total number of partitions
+ * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+ */
+ public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
+ byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+
+ // hash the keyBytes to choose a partition
+ return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ }
+
+ private static int toPositive(int number) {
+ return number & 0x7fffffff;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index fa19ed7..41e2235 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsMetrics;
import java.io.File;
@@ -70,9 +70,9 @@ public interface ProcessorContext {
/**
* Returns Metrics instance
*
- * @return StreamingMetrics
+ * @return StreamsMetrics
*/
- StreamingMetrics metrics();
+ StreamsMetrics metrics();
/**
* Registers and possibly restores the specified storage engine.