You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/22 22:00:11 UTC
[3/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to
KafkaStreams
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
deleted file mode 100644
index f14d9d9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
- * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
- * <p>
- * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
- * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
- * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
- * <p>
- * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
- * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
- * automatically manage these instances, and adjust when new topology instances are added or removed.
- * <p>
- * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
- * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
- * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
- * determine to which partition each message should be written.
- * <p>
- * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
- * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
- * for that topic.
- * <p>
- * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
- * org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
- * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
- */
-public interface StreamPartitioner<K, V> {
-
- /**
- * Determine the partition number for a message with the given key and value and the current number of partitions.
- *
- * @param key the key of the message
- * @param value the value of the message
- * @param numPartitions the total number of partitions
- * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
- */
- Integer partition(K key, V value, int numPartitions);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
new file mode 100644
index 0000000..f8d199d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
+ * <p>
+ * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
+ * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
+ * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * <p>
+ * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
+ * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * automatically manage these instances, and adjust when new topology instances are added or removed.
+ * <p>
+ * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
+ * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each message should be written.
+ * <p>
+ * To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner
+ * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink}
+ * for that topic.
+ * <p>
+ * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ * org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...)
+ * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...)
+ */
+public interface StreamsPartitioner<K, V> {
+
+ /**
+ * Determine the partition number for a message with the given key and value and the current number of partitions.
+ *
+ * @param key the key of the message
+ * @param value the value of the message
+ * @param numPartitions the total number of partitions
+ * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+ */
+ Integer partition(K key, V value, int numPartitions);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index d6b63d2..f4e6821 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -46,8 +46,8 @@ import java.util.Set;
* its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
* processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
* is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming}
- * instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}.
+ * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing messages}.
*/
public class TopologyBuilder {
@@ -135,9 +135,9 @@ public class TopologyBuilder {
public final String topic;
private Serializer keySerializer;
private Serializer valSerializer;
- private final StreamPartitioner partitioner;
+ private final StreamsPartitioner partitioner;
- private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
+ private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) {
super(name);
this.parents = parents.clone();
this.topic = topic;
@@ -188,9 +188,9 @@ public class TopologyBuilder {
/**
* Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
- * The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
- * {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+ * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
*
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@@ -208,11 +208,11 @@ public class TopologyBuilder {
* @param name the unique name of the source used to reference this node when
* {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
* @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
- * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
- * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param topics the name of one or more Kafka topics that this source is to consume
* @return this builder instance so methods can be chained together; never null
*/
@@ -236,18 +236,18 @@ public class TopologyBuilder {
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
- * The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
- * {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
+ * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
*
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
- * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, StreamsPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
@@ -256,11 +256,11 @@ public class TopologyBuilder {
/**
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
* the supplied partitioner.
- * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
- * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}.
+ * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+ * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
* <p>
- * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+ * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -274,9 +274,9 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
*/
- public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
+ public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) {
return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
}
@@ -284,7 +284,7 @@ public class TopologyBuilder {
* Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
* The sink will use the specified key and value serializers.
* <p>
- * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+ * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among
* the named Kafka topic's partitions. Such control is often useful with topologies that use
* {@link #addStateStore(StateStoreSupplier, String...) state stores}
* in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
@@ -293,20 +293,20 @@ public class TopologyBuilder {
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
- * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamPartitioner, String...)
- * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ * @see #addSink(String, String, StreamsPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...)
*/
public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
- return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
+ return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames);
}
/**
@@ -316,20 +316,20 @@ public class TopologyBuilder {
* @param name the unique name of the sink
* @param topic the name of the Kafka topic to which this sink should write its messages
* @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}
+ * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+ * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
* @param partitioner the function that should be used to determine the partition for each message processed by the sink
* @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
* and write to its topic
* @return this builder instance so methods can be chained together; never null
* @see #addSink(String, String, String...)
- * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, StreamsPartitioner, String...)
* @see #addSink(String, String, Serializer, Serializer, String...)
*/
- public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
+ public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) {
if (nodeFactories.containsKey(name))
throw new TopologyException("Processor " + name + " is already added.");
@@ -589,9 +589,9 @@ public class TopologyBuilder {
/**
* Build the topology for the specified topic group. This is called automatically when passing this builder into the
- * {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor.
+ * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
*
- * @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)
+ * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
public ProcessorTopology build(Integer topicGroupId) {
Set<String> nodeGroup;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 569f4e6..ef4c3c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
@@ -48,7 +48,7 @@ public abstract class AbstractTask {
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
- StreamingConfig config,
+ StreamsConfig config,
boolean isStandby) {
this.id = id;
this.partitions = new HashSet<>(partitions);
@@ -57,7 +57,7 @@ public abstract class AbstractTask {
// create the processor state manager
try {
- File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+ File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString());
// if partitions is null, this is a standby task
this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
deleted file mode 100644
index 2734f56..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ /dev/null
@@ -1,483 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
-import org.apache.kafka.streams.processor.internals.assignment.ClientState;
-import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zookeeper.ZooDefs;
-
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.I0Itec.zkclient.ZkClient;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
-
- private StreamThread streamThread;
-
- private int numStandbyReplicas;
- private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
- private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
- private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
- private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
- private Map<TaskId, Set<TopicPartition>> standbyTasks;
-
- // TODO: the following ZK dependency should be removed after KIP-4
- private static final String ZK_TOPIC_PATH = "/brokers/topics";
- private static final String ZK_BROKER_PATH = "/brokers/ids";
- private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
-
- private ZkClient zkClient;
-
- private class ZKStringSerializer implements ZkSerializer {
-
- @Override
- public byte[] serialize(Object data) {
- try {
- return ((String) data).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- }
-
- @Override
- public Object deserialize(byte[] bytes) {
- try {
- if (bytes == null)
- return null;
- else
- return new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- }
- }
-
- private List<Integer> getBrokers() {
- List<Integer> brokers = new ArrayList<>();
- for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
- brokers.add(Integer.parseInt(broker));
- }
- Collections.sort(brokers);
-
- log.debug("Read brokers {} from ZK in partition assignor.", brokers);
-
- return brokers;
- }
-
- @SuppressWarnings("unchecked")
- private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
- String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
-
- if (data == null) return null;
-
- try {
- ObjectMapper mapper = new ObjectMapper();
-
- Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
-
- });
-
- Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
-
- log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
-
- return partitions;
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- }
-
- private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
- log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
-
- // we always assign leaders to brokers starting at the first one with replication factor 1
- List<Integer> brokers = getBrokers();
-
- Map<Integer, List<Integer>> assignment = new HashMap<>();
- for (int i = 0; i < numPartitions; i++) {
- assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
- }
-
- // try to write to ZK with open ACL
- try {
- Map<String, Object> dataMap = new HashMap<>();
- dataMap.put("version", 1);
- dataMap.put("partitions", assignment);
-
- ObjectMapper mapper = new ObjectMapper();
- String data = mapper.writeValueAsString(dataMap);
-
- zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
- } catch (JsonProcessingException e) {
- throw new KafkaException(e);
- }
- }
-
- private void deleteTopic(String topic) throws ZkNodeExistsException {
- log.debug("Deleting topic {} from ZK in partition assignor.", topic);
-
- zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
- }
-
- private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
- log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
-
- // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
- List<Integer> brokers = getBrokers();
-
- int startIndex = existingAssignment.size();
-
- Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
-
- for (int i = 0; i < numPartitions; i++) {
- newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
- }
-
- // try to write to ZK with open ACL
- try {
- Map<String, Object> dataMap = new HashMap<>();
- dataMap.put("version", 1);
- dataMap.put("partitions", newAssignment);
-
- ObjectMapper mapper = new ObjectMapper();
- String data = mapper.writeValueAsString(dataMap);
-
- zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
- } catch (JsonProcessingException e) {
- throw new KafkaException(e);
- }
- }
-
- /**
- * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
- * since the former needs later's cached metadata while sending subscriptions,
- * and the latter needs former's returned assignment when adding tasks.
- */
- @Override
- public void configure(Map<String, ?> configs) {
- numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
- Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
- if (o == null) {
- KafkaException ex = new KafkaException("StreamThread is not specified");
- log.error(ex.getMessage(), ex);
- throw ex;
- }
-
- if (!(o instanceof StreamThread)) {
- KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
- log.error(ex.getMessage(), ex);
- throw ex;
- }
-
- streamThread = (StreamThread) o;
- streamThread.partitionAssignor(this);
-
- this.topicGroups = streamThread.builder.topicGroups();
-
- if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG))
- zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
- }
-
- @Override
- public String name() {
- return "streaming";
- }
-
- @Override
- public Subscription subscription(Set<String> topics) {
- // Adds the following information to subscription
- // 1. Client UUID (a unique id assigned to an instance of KafkaStreaming)
- // 2. Task ids of previously running tasks
- // 3. Task ids of valid local states on the client's state directory.
-
- Set<TaskId> prevTasks = streamThread.prevTasks();
- Set<TaskId> standbyTasks = streamThread.cachedTasks();
- standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
-
- return new Subscription(new ArrayList<>(topics), data.encode());
- }
-
- @Override
- public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- // This assigns tasks to consumer clients in two steps.
- // 1. using TaskAssignor tasks are assigned to streaming clients.
- // - Assign a task to a client which was running it previously.
- // If there is no such client, assign a task to a client which has its valid local state.
- // - A client may have more than one stream threads.
- // The assignor tries to assign tasks to a client proportionally to the number of threads.
- // - We try not to assign the same set of tasks to two different clients
- // We do the assignment in one-pass. The result may not satisfy above all.
- // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
- Map<UUID, Set<String>> consumersByClient = new HashMap<>();
- Map<UUID, ClientState<TaskId>> states = new HashMap<>();
-
- // Decode subscription info
- for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
- String consumerId = entry.getKey();
- Subscription subscription = entry.getValue();
-
- SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-
- Set<String> consumers = consumersByClient.get(info.processId);
- if (consumers == null) {
- consumers = new HashSet<>();
- consumersByClient.put(info.processId, consumers);
- }
- consumers.add(consumerId);
-
- ClientState<TaskId> state = states.get(info.processId);
- if (state == null) {
- state = new ClientState<>();
- states.put(info.processId, state);
- }
-
- state.prevActiveTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.prevTasks);
- state.prevAssignedTasks.addAll(info.standbyTasks);
- state.capacity = state.capacity + 1d;
- }
-
- // get the tasks as partition groups from the partition grouper
- Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
- sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
- }
- Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
-
- // add tasks to state topic subscribers
- stateChangelogTopicToTaskIds = new HashMap<>();
- internalSourceTopicToTaskIds = new HashMap<>();
- for (TaskId task : partitionsForTask.keySet()) {
- for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
- Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
- if (tasks == null) {
- tasks = new HashSet<>();
- stateChangelogTopicToTaskIds.put(stateName, tasks);
- }
-
- tasks.add(task);
- }
-
- for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
- Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
- if (tasks == null) {
- tasks = new HashSet<>();
- internalSourceTopicToTaskIds.put(topicName, tasks);
- }
-
- tasks.add(task);
- }
- }
-
- // assign tasks to clients
- states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
- Map<String, Assignment> assignment = new HashMap<>();
-
- for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
- UUID processId = entry.getKey();
- Set<String> consumers = entry.getValue();
- ClientState<TaskId> state = states.get(processId);
-
- ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
- final int numActiveTasks = state.activeTasks.size();
- for (TaskId taskId : state.activeTasks) {
- taskIds.add(taskId);
- }
- for (TaskId id : state.assignedTasks) {
- if (!state.activeTasks.contains(id))
- taskIds.add(id);
- }
-
- final int numConsumers = consumers.size();
- List<TaskId> active = new ArrayList<>();
- Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
-
- int i = 0;
- for (String consumer : consumers) {
- List<TopicPartition> activePartitions = new ArrayList<>();
-
- final int numTaskIds = taskIds.size();
- for (int j = i; j < numTaskIds; j += numConsumers) {
- TaskId taskId = taskIds.get(j);
- if (j < numActiveTasks) {
- for (TopicPartition partition : partitionsForTask.get(taskId)) {
- activePartitions.add(partition);
- active.add(taskId);
- }
- } else {
- Set<TopicPartition> standbyPartitions = standby.get(taskId);
- if (standbyPartitions == null) {
- standbyPartitions = new HashSet<>();
- standby.put(taskId, standbyPartitions);
- }
- standbyPartitions.addAll(partitionsForTask.get(taskId));
- }
- }
-
- AssignmentInfo data = new AssignmentInfo(active, standby);
- assignment.put(consumer, new Assignment(activePartitions, data.encode()));
- i++;
-
- active.clear();
- standby.clear();
- }
- }
-
- // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
- if (zkClient != null) {
- log.debug("Starting to validate changelog topics in partition assignor.");
-
- Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
- topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
- topicToTaskIds.putAll(internalSourceTopicToTaskIds);
-
- for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
- String topic = streamThread.jobId + "-" + entry.getKey();
-
- // the expected number of partitions is the max value of TaskId.partition + 1
- int numPartitions = 0;
- for (TaskId task : entry.getValue()) {
- if (numPartitions < task.partition + 1)
- numPartitions = task.partition + 1;
- }
-
- boolean topicNotReady = true;
-
- while (topicNotReady) {
- Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
-
- // if topic does not exist, create it
- if (topicMetadata == null) {
- try {
- createTopic(topic, numPartitions);
- } catch (ZkNodeExistsException e) {
- // ignore and continue
- }
- } else {
- if (topicMetadata.size() > numPartitions) {
- // else if topic exists with more #.partitions than needed, delete in order to re-create it
- try {
- deleteTopic(topic);
- } catch (ZkNodeExistsException e) {
- // ignore and continue
- }
- } else if (topicMetadata.size() < numPartitions) {
- // else if topic exists with less #.partitions than needed, add partitions
- try {
- addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
- } catch (ZkNoNodeException e) {
- // ignore and continue
- }
- }
-
- topicNotReady = false;
- }
- }
-
- // wait until the topic metadata has been propagated to all brokers
- List<PartitionInfo> partitions;
- do {
- partitions = streamThread.restoreConsumer.partitionsFor(topic);
- } while (partitions == null || partitions.size() != numPartitions);
- }
-
- log.info("Completed validating changelog topics in partition assignor.");
- }
-
- return assignment;
- }
-
- @Override
- public void onAssignment(Assignment assignment) {
- List<TopicPartition> partitions = assignment.partitions();
-
- AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
- this.standbyTasks = info.standbyTasks;
-
- Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
- Iterator<TaskId> iter = info.activeTasks.iterator();
- for (TopicPartition partition : partitions) {
- Set<TaskId> taskIds = partitionToTaskIds.get(partition);
- if (taskIds == null) {
- taskIds = new HashSet<>();
- partitionToTaskIds.put(partition, taskIds);
- }
-
- if (iter.hasNext()) {
- taskIds.add(iter.next());
- } else {
- TaskAssignmentException ex = new TaskAssignmentException(
- "failed to find a task id for the partition=" + partition.toString() +
- ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
- );
- log.error(ex.getMessage(), ex);
- throw ex;
- }
- }
- this.partitionToTaskIds = partitionToTaskIds;
- }
-
- /* For Test Only */
- public Set<TaskId> tasksForState(String stateName) {
- return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
- }
-
- public Set<TaskId> tasksForPartition(TopicPartition partition) {
- return partitionToTaskIds.get(partition);
- }
-
- public Map<TaskId, Set<TopicPartition>> standbyTasks() {
- return standbyTasks;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 3429df3..102c534 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -38,7 +38,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
private final TaskId id;
private final StreamTask task;
- private final StreamingMetrics metrics;
+ private final StreamsMetrics metrics;
private final RecordCollector collector;
private final ProcessorStateManager stateMgr;
@@ -52,10 +52,10 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
@SuppressWarnings("unchecked")
public ProcessorContextImpl(TaskId id,
StreamTask task,
- StreamingConfig config,
+ StreamsConfig config,
RecordCollector collector,
ProcessorStateManager stateMgr,
- StreamingMetrics metrics) {
+ StreamsMetrics metrics) {
this.id = id;
this.task = task;
this.metrics = metrics;
@@ -113,7 +113,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
}
@Override
- public StreamingMetrics metrics() {
+ public StreamsMetrics metrics() {
return metrics;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index fe0472e..25c663d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +72,7 @@ public class RecordCollector {
}
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
- StreamPartitioner<K, V> partitioner) {
+ StreamsPartitioner<K, V> partitioner) {
byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
Integer partition = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 7ab59ee..88b3f56 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
public class SinkNode<K, V> extends ProcessorNode<K, V> {
private final String topic;
private Serializer<K> keySerializer;
private Serializer<V> valSerializer;
- private final StreamPartitioner<K, V> partitioner;
+ private final StreamsPartitioner<K, V> partitioner;
private ProcessorContext context;
- public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
+ public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) {
super(name);
this.topic = topic;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index e0583e3..89d185c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,8 +20,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -36,7 +36,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
private final TaskId id;
- private final StreamingMetrics metrics;
+ private final StreamsMetrics metrics;
private final ProcessorStateManager stateMgr;
private final Serializer<?> keySerializer;
@@ -47,9 +47,9 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
private boolean initialized;
public StandbyContextImpl(TaskId id,
- StreamingConfig config,
+ StreamsConfig config,
ProcessorStateManager stateMgr,
- StreamingMetrics metrics) {
+ StreamsMetrics metrics) {
this.id = id;
this.metrics = metrics;
this.stateMgr = stateMgr;
@@ -105,7 +105,7 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
}
@Override
- public StreamingMetrics metrics() {
+ public StreamsMetrics metrics() {
return metrics;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 4cc4ea4..861b830 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -21,8 +21,8 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +50,8 @@ public class StandbyTask extends AbstractTask {
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
- * @param config the {@link StreamingConfig} specified by the user
- * @param metrics the {@link StreamingMetrics} created by the thread
+ * @param config the {@link StreamsConfig} specified by the user
+ * @param metrics the {@link StreamsMetrics} created by the thread
*/
public StandbyTask(TaskId id,
String jobId,
@@ -59,8 +59,8 @@ public class StandbyTask extends AbstractTask {
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
- StreamingConfig config,
- StreamingMetrics metrics) {
+ StreamsConfig config,
+ StreamsMetrics metrics) {
super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
// initialize the topology with its own context
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
new file mode 100644
index 0000000..5d87e5a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.ClientState;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignmentException;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.ZooDefs;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.I0Itec.zkclient.ZkClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
+
+ private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+
+ private StreamThread streamThread;
+
+ private int numStandbyReplicas;
+ private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
+ private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
+ private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
+ private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
+ private Map<TaskId, Set<TopicPartition>> standbyTasks;
+
+ // TODO: the following ZK dependency should be removed after KIP-4
+ private static final String ZK_TOPIC_PATH = "/brokers/topics";
+ private static final String ZK_BROKER_PATH = "/brokers/ids";
+ private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+ private ZkClient zkClient;
+
+ private class ZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) {
+ try {
+ return ((String) data).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ try {
+ if (bytes == null)
+ return null;
+ else
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ private List<Integer> getBrokers() {
+ List<Integer> brokers = new ArrayList<>();
+ for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+ brokers.add(Integer.parseInt(broker));
+ }
+ Collections.sort(brokers);
+
+ log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+ return brokers;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+ String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+ if (data == null) return null;
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+
+ Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+ });
+
+ Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+ log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+ return partitions;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
+ log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+ // we always assign leaders to brokers starting at the first one with replication factor 1
+ List<Integer> brokers = getBrokers();
+
+ Map<Integer, List<Integer>> assignment = new HashMap<>();
+ for (int i = 0; i < numPartitions; i++) {
+ assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", assignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ } catch (JsonProcessingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ private void deleteTopic(String topic) throws ZkNodeExistsException {
+ log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+ zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
+ log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+ // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
+ List<Integer> brokers = getBrokers();
+
+ int startIndex = existingAssignment.size();
+
+ Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+ for (int i = 0; i < numPartitions; i++) {
+ newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
+ }
+
+ // try to write to ZK with open ACL
+ try {
+ Map<String, Object> dataMap = new HashMap<>();
+ dataMap.put("version", 1);
+ dataMap.put("partitions", newAssignment);
+
+ ObjectMapper mapper = new ObjectMapper();
+ String data = mapper.writeValueAsString(dataMap);
+
+ zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+ } catch (JsonProcessingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ /**
+ * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
+ * since the former needs later's cached metadata while sending subscriptions,
+ * and the latter needs former's returned assignment when adding tasks.
+ */
+ @Override
+ public void configure(Map<String, ?> configs) {
+ numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+ Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
+ if (o == null) {
+ KafkaException ex = new KafkaException("StreamThread is not specified");
+ log.error(ex.getMessage(), ex);
+ throw ex;
+ }
+
+ if (!(o instanceof StreamThread)) {
+ KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
+ log.error(ex.getMessage(), ex);
+ throw ex;
+ }
+
+ streamThread = (StreamThread) o;
+ streamThread.partitionAssignor(this);
+
+ this.topicGroups = streamThread.builder.topicGroups();
+
+ if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG))
+ zkClient = new ZkClient((String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
+ }
+
+ @Override
+ public String name() {
+ return "stream";
+ }
+
+ @Override
+ public Subscription subscription(Set<String> topics) {
+ // Adds the following information to subscription
+ // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
+ // 2. Task ids of previously running tasks
+ // 3. Task ids of valid local states on the client's state directory.
+
+ Set<TaskId> prevTasks = streamThread.prevTasks();
+ Set<TaskId> standbyTasks = streamThread.cachedTasks();
+ standbyTasks.removeAll(prevTasks);
+ SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
+
+ return new Subscription(new ArrayList<>(topics), data.encode());
+ }
+
+ @Override
+ public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+ // This assigns tasks to consumer clients in two steps.
+ // 1. using TaskAssignor to assign tasks to consumer clients.
+ // - Assign a task to a client which was running it previously.
+ // If there is no such client, assign a task to a client which has its valid local state.
+ // - A client may have more than one stream threads.
+ // The assignor tries to assign tasks to a client proportionally to the number of threads.
+ // - We try not to assign the same set of tasks to two different clients
+ // We do the assignment in one-pass. The result may not satisfy above all.
+ // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
+ Map<UUID, Set<String>> consumersByClient = new HashMap<>();
+ Map<UUID, ClientState<TaskId>> states = new HashMap<>();
+
+ // Decode subscription info
+ for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
+ String consumerId = entry.getKey();
+ Subscription subscription = entry.getValue();
+
+ SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+
+ Set<String> consumers = consumersByClient.get(info.processId);
+ if (consumers == null) {
+ consumers = new HashSet<>();
+ consumersByClient.put(info.processId, consumers);
+ }
+ consumers.add(consumerId);
+
+ ClientState<TaskId> state = states.get(info.processId);
+ if (state == null) {
+ state = new ClientState<>();
+ states.put(info.processId, state);
+ }
+
+ state.prevActiveTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.prevTasks);
+ state.prevAssignedTasks.addAll(info.standbyTasks);
+ state.capacity = state.capacity + 1d;
+ }
+
+ // get the tasks as partition groups from the partition grouper
+ Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
+ for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
+ }
+ Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
+
+ // add tasks to state topic subscribers
+ stateChangelogTopicToTaskIds = new HashMap<>();
+ internalSourceTopicToTaskIds = new HashMap<>();
+ for (TaskId task : partitionsForTask.keySet()) {
+ for (String stateName : topicGroups.get(task.topicGroupId).stateChangelogTopics) {
+ Set<TaskId> tasks = stateChangelogTopicToTaskIds.get(stateName);
+ if (tasks == null) {
+ tasks = new HashSet<>();
+ stateChangelogTopicToTaskIds.put(stateName, tasks);
+ }
+
+ tasks.add(task);
+ }
+
+ for (String topicName : topicGroups.get(task.topicGroupId).interSourceTopics) {
+ Set<TaskId> tasks = internalSourceTopicToTaskIds.get(topicName);
+ if (tasks == null) {
+ tasks = new HashSet<>();
+ internalSourceTopicToTaskIds.put(topicName, tasks);
+ }
+
+ tasks.add(task);
+ }
+ }
+
+ // assign tasks to clients
+ states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
+ Map<String, Assignment> assignment = new HashMap<>();
+
+ for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
+ UUID processId = entry.getKey();
+ Set<String> consumers = entry.getValue();
+ ClientState<TaskId> state = states.get(processId);
+
+ ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
+ final int numActiveTasks = state.activeTasks.size();
+ for (TaskId taskId : state.activeTasks) {
+ taskIds.add(taskId);
+ }
+ for (TaskId id : state.assignedTasks) {
+ if (!state.activeTasks.contains(id))
+ taskIds.add(id);
+ }
+
+ final int numConsumers = consumers.size();
+ List<TaskId> active = new ArrayList<>();
+ Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
+
+ int i = 0;
+ for (String consumer : consumers) {
+ List<TopicPartition> activePartitions = new ArrayList<>();
+
+ final int numTaskIds = taskIds.size();
+ for (int j = i; j < numTaskIds; j += numConsumers) {
+ TaskId taskId = taskIds.get(j);
+ if (j < numActiveTasks) {
+ for (TopicPartition partition : partitionsForTask.get(taskId)) {
+ activePartitions.add(partition);
+ active.add(taskId);
+ }
+ } else {
+ Set<TopicPartition> standbyPartitions = standby.get(taskId);
+ if (standbyPartitions == null) {
+ standbyPartitions = new HashSet<>();
+ standby.put(taskId, standbyPartitions);
+ }
+ standbyPartitions.addAll(partitionsForTask.get(taskId));
+ }
+ }
+
+ AssignmentInfo data = new AssignmentInfo(active, standby);
+ assignment.put(consumer, new Assignment(activePartitions, data.encode()));
+ i++;
+
+ active.clear();
+ standby.clear();
+ }
+ }
+
+ // if ZK is specified, get the tasks / internal topics for each state topic and validate the topic partitions
+ if (zkClient != null) {
+ log.debug("Starting to validate changelog topics in partition assignor.");
+
+ Map<String, Set<TaskId>> topicToTaskIds = new HashMap<>();
+ topicToTaskIds.putAll(stateChangelogTopicToTaskIds);
+ topicToTaskIds.putAll(internalSourceTopicToTaskIds);
+
+ for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
+ String topic = streamThread.jobId + "-" + entry.getKey();
+
+ // the expected number of partitions is the max value of TaskId.partition + 1
+ int numPartitions = 0;
+ for (TaskId task : entry.getValue()) {
+ if (numPartitions < task.partition + 1)
+ numPartitions = task.partition + 1;
+ }
+
+ boolean topicNotReady = true;
+
+ while (topicNotReady) {
+ Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+ // if topic does not exist, create it
+ if (topicMetadata == null) {
+ try {
+ createTopic(topic, numPartitions);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else {
+ if (topicMetadata.size() > numPartitions) {
+ // else if topic exists with more #.partitions than needed, delete in order to re-create it
+ try {
+ deleteTopic(topic);
+ } catch (ZkNodeExistsException e) {
+ // ignore and continue
+ }
+ } else if (topicMetadata.size() < numPartitions) {
+ // else if topic exists with less #.partitions than needed, add partitions
+ try {
+ addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
+ } catch (ZkNoNodeException e) {
+ // ignore and continue
+ }
+ }
+
+ topicNotReady = false;
+ }
+ }
+
+ // wait until the topic metadata has been propagated to all brokers
+ List<PartitionInfo> partitions;
+ do {
+ partitions = streamThread.restoreConsumer.partitionsFor(topic);
+ } while (partitions == null || partitions.size() != numPartitions);
+ }
+
+ log.info("Completed validating changelog topics in partition assignor.");
+ }
+
+ return assignment;
+ }
+
+ @Override
+ public void onAssignment(Assignment assignment) {
+ List<TopicPartition> partitions = assignment.partitions();
+
+ AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+ this.standbyTasks = info.standbyTasks;
+
+ Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
+ Iterator<TaskId> iter = info.activeTasks.iterator();
+ for (TopicPartition partition : partitions) {
+ Set<TaskId> taskIds = partitionToTaskIds.get(partition);
+ if (taskIds == null) {
+ taskIds = new HashSet<>();
+ partitionToTaskIds.put(partition, taskIds);
+ }
+
+ if (iter.hasNext()) {
+ taskIds.add(iter.next());
+ } else {
+ TaskAssignmentException ex = new TaskAssignmentException(
+ "failed to find a task id for the partition=" + partition.toString() +
+ ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString()
+ );
+ log.error(ex.getMessage(), ex);
+ throw ex;
+ }
+ }
+ this.partitionToTaskIds = partitionToTaskIds;
+ }
+
+ /* For Test Only */
+ public Set<TaskId> tasksForState(String stateName) {
+ return stateChangelogTopicToTaskIds.get(stateName + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+ }
+
+ public Set<TaskId> tasksForPartition(TopicPartition partition) {
+ return partitionToTaskIds.get(partition);
+ }
+
+ public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ return standbyTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2e58ad5..ec3d011 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
@@ -67,8 +67,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
* @param consumer the instance of {@link Consumer}
* @param producer the instance of {@link Producer}
* @param restoreConsumer the instance of {@link Consumer} used when restoring state
- * @param config the {@link StreamingConfig} specified by the user
- * @param metrics the {@link StreamingMetrics} created by the thread
+ * @param config the {@link StreamsConfig} specified by the user
+ * @param metrics the {@link StreamsMetrics} created by the thread
*/
public StreamTask(TaskId id,
String jobId,
@@ -77,11 +77,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
Consumer<byte[], byte[]> consumer,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> restoreConsumer,
- StreamingConfig config,
- StreamingMetrics metrics) {
+ StreamsConfig config,
+ StreamsMetrics metrics) {
super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
this.punctuationQueue = new PunctuationQueue();
- this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+ this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
// create queues for each assigned partition and associate them
// to corresponding source nodes in the processor topology
@@ -93,7 +93,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
partitionQueues.put(partition, queue);
}
- TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+ TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
// initialize the consumed offset cache