You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:32 UTC
[6/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
deleted file mode 100644
index 893f7de..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.QuickUnion;
-import org.apache.kafka.streams.processor.internals.SinkNode;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
- * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
- * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
- * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
- * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
- * that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
- */
-public class TopologyBuilder {
-
- // node factories in a topological order
- private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
-
- private final Set<String> sourceTopicNames = new HashSet<>();
-
- private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
- private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
- private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
- private Map<Integer, Set<String>> nodeGroups = null;
-
- private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
- private Map<String, Set<String>> stateStoreUsers = new HashMap();
-
- private static abstract class NodeFactory {
- public final String name;
-
- NodeFactory(String name) {
- this.name = name;
- }
-
- public abstract ProcessorNode build();
- }
-
- private static class ProcessorNodeFactory extends NodeFactory {
- public final String[] parents;
- private final ProcessorSupplier supplier;
- private final Set<String> stateStoreNames = new HashSet<>();
-
- public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
- super(name);
- this.parents = parents.clone();
- this.supplier = supplier;
- }
-
- public void addStateStore(String stateStoreName) {
- stateStoreNames.add(stateStoreName);
- }
-
- @Override
- public ProcessorNode build() {
- return new ProcessorNode(name, supplier.get(), stateStoreNames);
- }
- }
-
- private static class SourceNodeFactory extends NodeFactory {
- public final String[] topics;
- private Deserializer keyDeserializer;
- private Deserializer valDeserializer;
-
- private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
- super(name);
- this.topics = topics.clone();
- this.keyDeserializer = keyDeserializer;
- this.valDeserializer = valDeserializer;
- }
-
- @Override
- public ProcessorNode build() {
- return new SourceNode(name, keyDeserializer, valDeserializer);
- }
- }
-
- private static class SinkNodeFactory extends NodeFactory {
- public final String[] parents;
- public final String topic;
- private Serializer keySerializer;
- private Serializer valSerializer;
-
- private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
- super(name);
- this.parents = parents.clone();
- this.topic = topic;
- this.keySerializer = keySerializer;
- this.valSerializer = valSerializer;
- }
- @Override
- public ProcessorNode build() {
- return new SinkNode(name, topic, keySerializer, valSerializer);
- }
- }
-
- /**
- * Create a new builder.
- */
- public TopologyBuilder() {}
-
- /**
- * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
- * The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
- * {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link StreamingConfig streaming configuration}.
- *
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder addSource(String name, String... topics) {
- return addSource(name, (Deserializer) null, (Deserializer) null, topics);
- }
-
- /**
- * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
- * The sink will use the specified key and value deserializers.
- *
- * @param name the unique name of the source used to reference this node when
- * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
- * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
- * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
- * {@link StreamingConfig streaming configuration}
- * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
- * should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
- * {@link StreamingConfig streaming configuration}
- * @param topics the name of one or more Kafka topics that this source is to consume
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
- if (nodeFactories.containsKey(name))
- throw new TopologyException("Processor " + name + " is already added.");
-
- for (String topic : topics) {
- if (sourceTopicNames.contains(topic))
- throw new TopologyException("Topic " + topic + " has already been registered by another source.");
-
- sourceTopicNames.add(topic);
- }
-
- nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
- nodeToTopics.put(name, topics.clone());
- nodeGrouper.add(name);
-
- return this;
- }
-
- /**
- * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
- * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
- * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}.
- *
- * @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its messages
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
- return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
- }
-
- /**
- * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
- * The sink will use the specified key and value serializers.
- *
- * @param name the unique name of the sink
- * @param topic the name of the Kafka topic to which this sink should write its messages
- * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
- * {@link StreamingConfig streaming configuration}
- * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
- * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
- * {@link StreamingConfig streaming configuration}
- * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
- * and write to its topic
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
- if (nodeFactories.containsKey(name))
- throw new TopologyException("Processor " + name + " is already added.");
-
- if (parentNames != null) {
- for (String parent : parentNames) {
- if (parent.equals(name)) {
- throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
- }
- if (!nodeFactories.containsKey(parent)) {
- throw new TopologyException("Parent processor " + parent + " is not added yet.");
- }
- }
- }
-
- nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
- return this;
- }
-
- /**
- * Add a new processor node that receives and processes messages output by one or more parent source or processor node.
- * Any new messages output by this processor will be forwarded to its child processor or sink nodes.
- * @param name the unique name of the processor node
- * @param supplier the supplier used to obtain this node's {@link Processor} instance
- * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
- * and process
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
- if (nodeFactories.containsKey(name))
- throw new TopologyException("Processor " + name + " is already added.");
-
- if (parentNames != null) {
- for (String parent : parentNames) {
- if (parent.equals(name)) {
- throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
- }
- if (!nodeFactories.containsKey(parent)) {
- throw new TopologyException("Parent processor " + parent + " is not added yet.");
- }
- }
- }
-
- nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
- nodeGrouper.add(name);
- nodeGrouper.unite(name, parentNames);
- return this;
- }
-
- /**
- * Adds a state store
- *
- * @param supplier the supplier used to obtain this state store {@link StateStore} instance
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
- if (stateStores.containsKey(supplier.name())) {
- throw new TopologyException("StateStore " + supplier.name() + " is already added.");
- }
- stateStores.put(supplier.name(), supplier);
- stateStoreUsers.put(supplier.name(), new HashSet<String>());
-
- if (processorNames != null) {
- for (String processorName : processorNames) {
- connectProcessorAndStateStore(processorName, supplier.name());
- }
- }
-
- return this;
- }
-
- /**
- * Connects the processor and the state stores
- *
- * @param processorName the name of the processor
- * @param stateStoreNames the names of state stores that the processor uses
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
- if (stateStoreNames != null) {
- for (String stateStoreName : stateStoreNames) {
- connectProcessorAndStateStore(processorName, stateStoreName);
- }
- }
-
- return this;
- }
-
- private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
- if (!stateStores.containsKey(stateStoreName))
- throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
- if (!nodeFactories.containsKey(processorName))
- throw new TopologyException("Processor " + processorName + " is not added yet.");
-
- Set<String> users = stateStoreUsers.get(stateStoreName);
- Iterator<String> iter = users.iterator();
- if (iter.hasNext()) {
- String user = iter.next();
- nodeGrouper.unite(user, processorName);
- }
- users.add(processorName);
-
- NodeFactory factory = nodeFactories.get(processorName);
- if (factory instanceof ProcessorNodeFactory) {
- ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
- } else {
- throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
- }
- }
-
- /**
- * Returns the map of topic groups keyed by the group id.
- * A topic group is a group of topics in the same task.
- *
- * @return groups of topic names
- */
- public Map<Integer, Set<String>> topicGroups() {
- Map<Integer, Set<String>> topicGroups = new HashMap<>();
-
- if (nodeGroups == null)
- nodeGroups = makeNodeGroups();
-
- for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
- Set<String> topicGroup = new HashSet<>();
- for (String node : entry.getValue()) {
- String[] topics = nodeToTopics.get(node);
- if (topics != null)
- topicGroup.addAll(Arrays.asList(topics));
- }
- topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
- }
-
- return Collections.unmodifiableMap(topicGroups);
- }
-
- /**
- * Returns the map of node groups keyed by the topic group id.
- *
- * @return groups of node names
- */
- public Map<Integer, Set<String>> nodeGroups() {
- if (nodeGroups == null)
- nodeGroups = makeNodeGroups();
-
- return nodeGroups;
- }
-
- private Map<Integer, Set<String>> makeNodeGroups() {
- HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
- HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
-
- int nodeGroupId = 0;
-
- // Go through source nodes first. This makes the group id assignment easy to predict in tests
- for (String nodeName : Utils.sorted(nodeToTopics.keySet())) {
- String root = nodeGrouper.root(nodeName);
- Set<String> nodeGroup = rootToNodeGroup.get(root);
- if (nodeGroup == null) {
- nodeGroup = new HashSet<>();
- rootToNodeGroup.put(root, nodeGroup);
- nodeGroups.put(nodeGroupId++, nodeGroup);
- }
- nodeGroup.add(nodeName);
- }
-
- // Go through non-source nodes
- for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
- if (!nodeToTopics.containsKey(nodeName)) {
- String root = nodeGrouper.root(nodeName);
- Set<String> nodeGroup = rootToNodeGroup.get(root);
- if (nodeGroup == null) {
- nodeGroup = new HashSet<>();
- rootToNodeGroup.put(root, nodeGroup);
- nodeGroups.put(nodeGroupId++, nodeGroup);
- }
- nodeGroup.add(nodeName);
- }
- }
-
- return nodeGroups;
- }
-
- /**
- * Asserts that the streams of the specified source nodes must be copartitioned.
- *
- * @param sourceNodes a set of source node names
- * @return this builder instance so methods can be chained together; never null
- */
- public final TopologyBuilder copartitionSources(Collection<String> sourceNodes) {
- copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
- return this;
- }
-
- /**
- * Returns the copartition groups.
- * A copartition group is a group of topics that are required to be copartitioned.
- *
- * @return groups of topic names
- */
- public Collection<Set<String>> copartitionGroups() {
- List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
- for (Set<String> nodeNames : copartitionSourceGroups) {
- Set<String> copartitionGroup = new HashSet<>();
- for (String node : nodeNames) {
- String[] topics = nodeToTopics.get(node);
- if (topics != null)
- copartitionGroup.addAll(Arrays.asList(topics));
- }
- list.add(Collections.unmodifiableSet(copartitionGroup));
- }
- return Collections.unmodifiableList(list);
- }
-
- /**
- * Build the topology for the specified topic group. This is called automatically when passing this builder into the
- * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
- *
- * @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
- */
- public ProcessorTopology build(Integer topicGroupId) {
- Set<String> nodeGroup;
- if (topicGroupId != null) {
- nodeGroup = nodeGroups().get(topicGroupId);
- } else {
- // when nodeGroup is null, we build the full topology. this is used in some tests.
- nodeGroup = null;
- }
- return build(nodeGroup);
- }
-
- @SuppressWarnings("unchecked")
- private ProcessorTopology build(Set<String> nodeGroup) {
- List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
- Map<String, ProcessorNode> processorMap = new HashMap<>();
- Map<String, SourceNode> topicSourceMap = new HashMap<>();
- Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
-
- try {
- // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
- for (NodeFactory factory : nodeFactories.values()) {
- if (nodeGroup == null || nodeGroup.contains(factory.name)) {
- ProcessorNode node = factory.build();
- processorNodes.add(node);
- processorMap.put(node.name(), node);
-
- if (factory instanceof ProcessorNodeFactory) {
- for (String parent : ((ProcessorNodeFactory) factory).parents) {
- processorMap.get(parent).addChild(node);
- }
- for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
- if (!stateStoreMap.containsKey(stateStoreName)) {
- stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
- }
- }
- } else if (factory instanceof SourceNodeFactory) {
- for (String topic : ((SourceNodeFactory) factory).topics) {
- topicSourceMap.put(topic, (SourceNode) node);
- }
- } else if (factory instanceof SinkNodeFactory) {
- for (String parent : ((SinkNodeFactory) factory).parents) {
- processorMap.get(parent).addChild(node);
- }
- } else {
- throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
- }
- }
- }
- } catch (Exception e) {
- throw new KafkaException("ProcessorNode construction failed: this should not happen.");
- }
-
- return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
- }
-
- /**
- * Get the names of topics that are to be consumed by the source nodes created by this builder.
- * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
- */
- public Set<String> sourceTopics() {
- return Collections.unmodifiableSet(sourceTopicNames);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
deleted file mode 100644
index 99d1405..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.KafkaException;
-
-public class TopologyException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public TopologyException(String message) {
- super(message);
- }
-
- public TopologyException(String name, Object value) {
- this(name, value, null);
- }
-
- public TopologyException(String name, Object value, String message) {
- super("Invalid topology building" + (message == null ? "" : ": " + message));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
deleted file mode 100644
index f7b14ad..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
-
- private PartitionGrouper partitionGrouper;
- private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-
- @Override
- public void configure(Map<String, ?> configs) {
- Object o = configs.get(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE);
- if (o == null)
- throw new KafkaException("PartitionGrouper is not specified");
-
- if (!PartitionGrouper.class.isInstance(o))
- throw new KafkaException(o.getClass().getName() + " is not an instance of " + PartitionGrouper.class.getName());
-
- partitionGrouper = (PartitionGrouper) o;
- partitionGrouper.partitionAssignor(this);
- }
-
- @Override
- public String name() {
- return "streaming";
- }
-
- @Override
- public Subscription subscription(Set<String> topics) {
- return new Subscription(new ArrayList<>(topics));
- }
-
- @Override
- public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- Map<TaskId, Set<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
-
- String[] clientIds = subscriptions.keySet().toArray(new String[subscriptions.size()]);
- TaskId[] taskIds = partitionGroups.keySet().toArray(new TaskId[partitionGroups.size()]);
-
- Map<String, Assignment> assignment = new HashMap<>();
-
- for (int i = 0; i < clientIds.length; i++) {
- List<TopicPartition> partitions = new ArrayList<>();
- List<TaskId> ids = new ArrayList<>();
- for (int j = i; j < taskIds.length; j += clientIds.length) {
- TaskId taskId = taskIds[j];
- for (TopicPartition partition : partitionGroups.get(taskId)) {
- partitions.add(partition);
- ids.add(taskId);
- }
- }
- ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 8);
- //version
- buf.putInt(1);
- // encode task ids
- for (TaskId id : ids) {
- buf.putInt(id.topicGroupId);
- buf.putInt(id.partition);
- }
- buf.rewind();
- assignment.put(clientIds[i], new Assignment(partitions, buf));
- }
-
- return assignment;
- }
-
- @Override
- public void onAssignment(Assignment assignment) {
- List<TopicPartition> partitions = assignment.partitions();
- ByteBuffer data = assignment.userData();
- data.rewind();
-
- Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
-
- // check version
- int version = data.getInt();
- if (version == 1) {
- for (TopicPartition partition : partitions) {
- Set<TaskId> taskIds = partitionToTaskIds.get(partition);
- if (taskIds == null) {
- taskIds = new HashSet<>();
- partitionToTaskIds.put(partition, taskIds);
- }
- // decode a task id
- taskIds.add(new TaskId(data.getInt(), data.getInt()));
- }
- } else {
- KafkaException ex = new KafkaException("unknown assignment data version: " + version);
- log.error(ex.getMessage(), ex);
- throw ex;
- }
- this.partitionToTaskIds = partitionToTaskIds;
- }
-
- public Set<TaskId> taskIds(TopicPartition partition) {
- return partitionToTaskIds.get(partition);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
deleted file mode 100644
index 717df2c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import java.util.LinkedList;
-
-/**
- * MinTimestampTracker implements {@link TimestampTracker} that maintains the min
- * timestamp of the maintained stamped elements.
- */
-public class MinTimestampTracker<E> implements TimestampTracker<E> {
-
- private final LinkedList<Stamped<E>> descendingSubsequence = new LinkedList<>();
-
- // in the case that incoming traffic is very small, the records maybe put and polled
- // within a single iteration, in this case we need to remember the last polled
- // record's timestamp
- private long lastKnownTime = NOT_KNOWN;
-
- public void addElement(Stamped<E> elem) {
- if (elem == null) throw new NullPointerException();
-
- Stamped<E> minElem = descendingSubsequence.peekLast();
- while (minElem != null && minElem.timestamp >= elem.timestamp) {
- descendingSubsequence.removeLast();
- minElem = descendingSubsequence.peekLast();
- }
- descendingSubsequence.offerLast(elem);
- }
-
- public void removeElement(Stamped<E> elem) {
- if (elem != null && descendingSubsequence.peekFirst() == elem)
- descendingSubsequence.removeFirst();
-
- if (descendingSubsequence.isEmpty())
- lastKnownTime = elem.timestamp;
- }
-
- public int size() {
- return descendingSubsequence.size();
- }
-
- public long get() {
- Stamped<E> stamped = descendingSubsequence.peekFirst();
-
- if (stamped == null)
- return lastKnownTime;
- else
- return stamped.timestamp;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
deleted file mode 100644
index d888085..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-/**
- * A PartitionGroup is composed from a set of partitions. It also maintains the timestamp of this
- * group, hence the associated task as the min timestamp across all partitions in the group.
- */
-public class PartitionGroup {
-
- private final Map<TopicPartition, RecordQueue> partitionQueues;
-
- private final PriorityQueue<RecordQueue> queuesByTime;
-
- private final TimestampExtractor timestampExtractor;
-
- public static class RecordInfo {
- public RecordQueue queue;
-
- public ProcessorNode node() {
- return queue.source();
- }
-
- public TopicPartition partition() {
- return queue.partition();
- }
- }
-
- // since task is thread-safe, we do not need to synchronize on local variables
- private int totalBuffered;
-
- public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues, TimestampExtractor timestampExtractor) {
- this.queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
-
- @Override
- public int compare(RecordQueue queue1, RecordQueue queue2) {
- long time1 = queue1.timestamp();
- long time2 = queue2.timestamp();
-
- if (time1 < time2) return -1;
- if (time1 > time2) return 1;
- return 0;
- }
- });
-
- this.partitionQueues = partitionQueues;
-
- this.timestampExtractor = timestampExtractor;
-
- this.totalBuffered = 0;
- }
-
- /**
- * Get the next record and queue
- *
- * @return StampedRecord
- */
- public StampedRecord nextRecord(RecordInfo info) {
- StampedRecord record = null;
-
- RecordQueue queue = queuesByTime.poll();
- if (queue != null) {
- // get the first record from this queue.
- record = queue.poll();
-
- if (queue.size() > 0) {
- queuesByTime.offer(queue);
- }
- }
- info.queue = queue;
-
- if (record != null) totalBuffered--;
-
- return record;
- }
-
- /**
- * Adds raw records to this partition group
- *
- * @param partition the partition
- * @param rawRecords the raw records
- * @return the queue size for the partition
- */
- public int addRawRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
- RecordQueue recordQueue = partitionQueues.get(partition);
-
- int oldSize = recordQueue.size();
- int newSize = recordQueue.addRawRecords(rawRecords, timestampExtractor);
-
- // add this record queue to be considered for processing in the future if it was empty before
- if (oldSize == 0 && newSize > 0) {
- queuesByTime.offer(recordQueue);
- }
-
- totalBuffered += newSize - oldSize;
-
- return newSize;
- }
-
- public Set<TopicPartition> partitions() {
- return Collections.unmodifiableSet(partitionQueues.keySet());
- }
-
- /**
- * Return the timestamp of this partition group as the smallest
- * partition timestamp among all its partitions
- */
- public long timestamp() {
- if (queuesByTime.isEmpty()) {
- // if there is no data in all partitions, return the smallest of their last known times
- long timestamp = Long.MAX_VALUE;
- for (RecordQueue queue : partitionQueues.values()) {
- if (timestamp > queue.timestamp())
- timestamp = queue.timestamp();
- }
- return timestamp;
- } else {
- return queuesByTime.peek().timestamp();
- }
- }
-
- public int numBuffered(TopicPartition partition) {
- RecordQueue recordQueue = partitionQueues.get(partition);
-
- if (recordQueue == null)
- throw new KafkaException("Record's partition does not belong to this partition-group.");
-
- return recordQueue.size();
- }
-
- public int topQueueSize() {
- RecordQueue recordQueue = queuesByTime.peek();
- return (recordQueue == null) ? 0 : recordQueue.size();
- }
-
- public int numBuffered() {
- return totalBuffered;
- }
-
- public void close() {
- queuesByTime.clear();
- partitionQueues.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
deleted file mode 100644
index 1321cc5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.TaskId;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
-
- private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
-
- private final TaskId id;
- private final StreamTask task;
- private final StreamingMetrics metrics;
- private final RecordCollector collector;
- private final ProcessorStateManager stateMgr;
-
- private final Serializer<?> keySerializer;
- private final Serializer<?> valSerializer;
- private final Deserializer<?> keyDeserializer;
- private final Deserializer<?> valDeserializer;
-
- private boolean initialized;
-
- @SuppressWarnings("unchecked")
- public ProcessorContextImpl(TaskId id,
- StreamTask task,
- StreamingConfig config,
- RecordCollector collector,
- ProcessorStateManager stateMgr,
- StreamingMetrics metrics) {
- this.id = id;
- this.task = task;
- this.metrics = metrics;
- this.collector = collector;
- this.stateMgr = stateMgr;
-
- this.keySerializer = config.keySerializer();
- this.valSerializer = config.valueSerializer();
- this.keyDeserializer = config.keyDeserializer();
- this.valDeserializer = config.valueDeserializer();
-
- this.initialized = false;
- }
-
- public void initialized() {
- this.initialized = true;
- }
-
- public TaskId id() {
- return id;
- }
-
- public ProcessorStateManager getStateMgr() {
- return stateMgr;
- }
-
- @Override
- public RecordCollector recordCollector() {
- return this.collector;
- }
-
- @Override
- public Serializer<?> keySerializer() {
- return this.keySerializer;
- }
-
- @Override
- public Serializer<?> valueSerializer() {
- return this.valSerializer;
- }
-
- @Override
- public Deserializer<?> keyDeserializer() {
- return this.keyDeserializer;
- }
-
- @Override
- public Deserializer<?> valueDeserializer() {
- return this.valDeserializer;
- }
-
- @Override
- public File stateDir() {
- return stateMgr.baseDir();
- }
-
- @Override
- public StreamingMetrics metrics() {
- return metrics;
- }
-
- @Override
- public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
- if (initialized)
- throw new KafkaException("Can only create state stores during initialization.");
-
- stateMgr.register(store, stateRestoreCallback);
- }
-
- @Override
- public StateStore getStateStore(String name) {
- ProcessorNode node = task.node();
-
- if (node == null)
- throw new KafkaException("accessing from an unknown node");
-
- if (!node.stateStores.contains(name))
- throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
-
- return stateMgr.getStore(name);
- }
-
- @Override
- public String topic() {
- if (task.record() == null)
- throw new IllegalStateException("this should not happen as topic() should only be called while a record is processed");
-
- return task.record().topic();
- }
-
- @Override
- public int partition() {
- if (task.record() == null)
- throw new IllegalStateException("this should not happen as partition() should only be called while a record is processed");
-
- return task.record().partition();
- }
-
- @Override
- public long offset() {
- if (this.task.record() == null)
- throw new IllegalStateException("this should not happen as offset() should only be called while a record is processed");
-
- return this.task.record().offset();
- }
-
- @Override
- public long timestamp() {
- if (task.record() == null)
- throw new IllegalStateException("this should not happen as timestamp() should only be called while a record is processed");
-
- return task.record().timestamp;
- }
-
- @Override
- public <K, V> void forward(K key, V value) {
- task.forward(key, value);
- }
-
- @Override
- public <K, V> void forward(K key, V value, int childIndex) {
- task.forward(key, value, childIndex);
- }
-
- @Override
- public void commit() {
- task.needCommit();
- }
-
- @Override
- public void schedule(long interval) {
- task.schedule(interval);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
deleted file mode 100644
index 6db83a1..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-public class ProcessorNode<K, V> {
-
- private final List<ProcessorNode<?, ?>> children;
-
- private final String name;
- private final Processor<K, V> processor;
-
- public final Set<String> stateStores;
-
- public ProcessorNode(String name) {
- this(name, null, null);
- }
-
- public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
- this.name = name;
- this.processor = processor;
- this.children = new ArrayList<>();
- this.stateStores = stateStores;
- }
-
- public final String name() {
- return name;
- }
-
- public final Processor processor() {
- return processor;
- }
-
- public final List<ProcessorNode<?, ?>> children() {
- return children;
- }
-
- public void addChild(ProcessorNode<?, ?> child) {
- children.add(child);
- }
-
- public void init(ProcessorContext context) {
- processor.init(context);
- }
-
- public void process(K key, V value) {
- processor.process(key, value);
- }
-
- public void close() {
- processor.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
deleted file mode 100644
index 3cb9cea..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProcessorStateManager {
-
- private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
-
- public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
- public static final String LOCK_FILE_NAME = ".lock";
-
- private final int partition;
- private final File baseDir;
- private final FileLock directoryLock;
- private final Map<String, StateStore> stores;
- private final Consumer<byte[], byte[]> restoreConsumer;
- private final Map<TopicPartition, Long> restoredOffsets;
- private final Map<TopicPartition, Long> checkpointedOffsets;
-
- public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException {
- this.partition = partition;
- this.baseDir = baseDir;
- this.stores = new HashMap<>();
- this.restoreConsumer = restoreConsumer;
- this.restoredOffsets = new HashMap<>();
-
- // create the state directory for this task if missing (we won't create the parent directory)
- createStateDirectory(baseDir);
-
- // try to acquire the exclusive lock on the state directory
- directoryLock = lockStateDirectory(baseDir);
- if (directoryLock == null) {
- throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
- }
-
- // load the checkpoint information
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
- this.checkpointedOffsets = new HashMap<>(checkpoint.read());
-
- // delete the checkpoint file after finish loading its stored offsets
- checkpoint.delete();
- }
-
- private static void createStateDirectory(File stateDir) throws IOException {
- if (!stateDir.exists()) {
- stateDir.mkdir();
- }
- }
-
- public static FileLock lockStateDirectory(File stateDir) throws IOException {
- File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
- FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
- try {
- return channel.tryLock();
- } catch (OverlappingFileLockException e) {
- return null;
- }
- }
-
- public File baseDir() {
- return this.baseDir;
- }
-
- public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
- if (store.name().equals(CHECKPOINT_FILE_NAME))
- throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
-
- if (this.stores.containsKey(store.name()))
- throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
-
- // ---- register the store ---- //
-
- // check that the underlying change log topic exist or not
- if (restoreConsumer.listTopics().containsKey(store.name())) {
- boolean partitionNotFound = true;
- for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) {
- if (partitionInfo.partition() == partition) {
- partitionNotFound = false;
- break;
- }
- }
-
- if (partitionNotFound)
- throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition " + partition);
-
- } else {
- throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet");
- }
-
- this.stores.put(store.name(), store);
-
- // ---- try to restore the state from change-log ---- //
-
- // subscribe to the store's partition
- TopicPartition storePartition = new TopicPartition(store.name(), partition);
- if (!restoreConsumer.subscription().isEmpty()) {
- throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
- }
- restoreConsumer.assign(Collections.singletonList(storePartition));
-
- // calculate the end offset of the partition
- // TODO: this is a bit hacky to first seek then position to get the end offset
- restoreConsumer.seekToEnd(storePartition);
- long endOffset = restoreConsumer.position(storePartition);
-
- // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
- // restore the state from the beginning of the change log otherwise
- if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
- restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
- } else {
- restoreConsumer.seekToBeginning(storePartition);
- }
-
- // restore its state from changelog records; while restoring the log end offset
- // should not change since it is only written by this thread.
- while (true) {
- for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
- stateRestoreCallback.restore(record.key(), record.value());
- }
-
- if (restoreConsumer.position(storePartition) == endOffset) {
- break;
- } else if (restoreConsumer.position(storePartition) > endOffset) {
- throw new IllegalStateException("Log end offset should not change while restoring");
- }
- }
-
- // record the restored offset for its change log partition
- long newOffset = restoreConsumer.position(storePartition);
- restoredOffsets.put(storePartition, newOffset);
-
- // un-assign the change log partition
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
- }
-
- public StateStore getStore(String name) {
- return stores.get(name);
- }
-
- public void cleanup() throws IOException {
- // clean up any unknown files in the state directory
- for (File file : this.baseDir.listFiles()) {
- if (!this.stores.containsKey(file.getName())) {
- log.info("Deleting state directory {}", file.getAbsolutePath());
- file.delete();
- }
- }
- }
-
- public void flush() {
- if (!this.stores.isEmpty()) {
- log.debug("Flushing stores.");
- for (StateStore store : this.stores.values())
- store.flush();
- }
- }
-
- public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
- if (!stores.isEmpty()) {
- log.debug("Closing stores.");
- for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
- log.debug("Closing storage engine {}", entry.getKey());
- entry.getValue().flush();
- entry.getValue().close();
- }
-
- Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
- for (String storeName : stores.keySet()) {
- TopicPartition part = new TopicPartition(storeName, partition);
-
- // only checkpoint the offset to the offsets file if it is persistent;
- if (stores.get(storeName).persistent()) {
- Long offset = ackedOffsets.get(part);
-
- if (offset == null) {
- // if no record was produced. we need to check the restored offset.
- offset = restoredOffsets.get(part);
- }
-
- if (offset != null) {
- // store the last offset + 1 (the log position after restoration)
- checkpointOffsets.put(part, offset + 1);
- }
- }
- }
-
- // write the checkpoint file before closing, to indicate clean shutdown
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
- checkpoint.write(checkpointOffsets);
- }
-
- // release the state directory directoryLock
- directoryLock.release();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
deleted file mode 100644
index a70aa70..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class ProcessorTopology {
-
- private final List<ProcessorNode> processorNodes;
- private final Map<String, SourceNode> sourceByTopics;
- private final List<StateStoreSupplier> stateStoreSuppliers;
-
- public ProcessorTopology(List<ProcessorNode> processorNodes,
- Map<String, SourceNode> sourceByTopics,
- List<StateStoreSupplier> stateStoreSuppliers) {
- this.processorNodes = Collections.unmodifiableList(processorNodes);
- this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
- this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
- }
-
- public Set<String> sourceTopics() {
- return sourceByTopics.keySet();
- }
-
- public SourceNode source(String topic) {
- return sourceByTopics.get(topic);
- }
-
- public Set<SourceNode> sources() {
- return new HashSet<>(sourceByTopics.values());
- }
-
- public List<ProcessorNode> processors() {
- return processorNodes;
- }
-
- public List<StateStoreSupplier> stateStoreSuppliers() {
- return stateStoreSuppliers;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
deleted file mode 100644
index b4b7afe..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import java.util.PriorityQueue;
-
-public class PunctuationQueue {
-
- private PriorityQueue<PunctuationSchedule> pq = new PriorityQueue<>();
-
- public void schedule(PunctuationSchedule sched) {
- synchronized (pq) {
- pq.add(sched);
- }
- }
-
- public void close() {
- synchronized (pq) {
- pq.clear();
- }
- }
-
- public boolean mayPunctuate(long timestamp, Punctuator punctuator) {
- synchronized (pq) {
- boolean punctuated = false;
- PunctuationSchedule top = pq.peek();
- while (top != null && top.timestamp <= timestamp) {
- PunctuationSchedule sched = top;
- pq.poll();
- punctuator.punctuate(sched.node(), timestamp);
- pq.add(sched.next());
- punctuated = true;
-
- top = pq.peek();
- }
-
- return punctuated;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
deleted file mode 100644
index dc9a50d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-public class PunctuationSchedule extends Stamped<ProcessorNode> {
-
- final long interval;
-
- public PunctuationSchedule(ProcessorNode node, long interval) {
- this(node, System.currentTimeMillis(), interval);
- }
-
- public PunctuationSchedule(ProcessorNode node, long time, long interval) {
- super(node, time + interval);
- this.interval = interval;
- }
-
- public ProcessorNode node() {
- return value;
- }
-
- public PunctuationSchedule next() {
- return new PunctuationSchedule(value, timestamp, interval);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
deleted file mode 100644
index d99e2ae..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-public interface Punctuator {
-
- void punctuate(ProcessorNode node, long streamTime);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
deleted file mode 100644
index 087cbd2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import java.util.HashMap;
-import java.util.NoSuchElementException;
-
-public class QuickUnion<T> {
-
- private HashMap<T, T> ids = new HashMap<>();
-
- public void add(T id) {
- ids.put(id, id);
- }
-
- public boolean exists(T id) {
- return ids.containsKey(id);
- }
-
- public T root(T id) {
- T current = id;
- T parent = ids.get(current);
-
- if (parent == null)
- throw new NoSuchElementException("id: " + id.toString());
-
- while (!parent.equals(current)) {
- // do the path compression
- T grandparent = ids.get(parent);
- ids.put(current, grandparent);
-
- current = parent;
- parent = grandparent;
- }
- return current;
- }
-
- public void unite(T id1, T... idList) {
- for (T id2 : idList) {
- unitePair(id1, id2);
- }
- }
-
- private void unitePair(T id1, T id2) {
- T root1 = root(id1);
- T root2 = root(id2);
-
- if (!root1.equals(root2))
- ids.put(root1, root2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
deleted file mode 100644
index f0dbf35..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class RecordCollector {
-
- /**
- * A supplier of a {@link RecordCollector} instance.
- */
- public static interface Supplier {
- /**
- * Get the record collector.
- * @return the record collector
- */
- public RecordCollector recordCollector();
- }
-
- private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
-
- private final Producer<byte[], byte[]> producer;
- private final Map<TopicPartition, Long> offsets;
- private final Callback callback = new Callback() {
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
- offsets.put(tp, metadata.offset());
- } else {
- log.error("Error sending record: ", exception);
- }
- }
- };
-
-
- public RecordCollector(Producer<byte[], byte[]> producer) {
- this.producer = producer;
- this.offsets = new HashMap<>();
- }
-
- public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
- byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
- this.producer.send(new ProducerRecord<>(record.topic(), keyBytes, valBytes), callback);
- }
-
- public void flush() {
- this.producer.flush();
- }
-
- /**
- * Closes this RecordCollector
- */
- public void close() {
- producer.close();
- }
-
- /**
- * The last ack'd offset from the producer
- *
- * @return the map from TopicPartition to offset
- */
- Map<TopicPartition, Long> offsets() {
- return this.offsets;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
deleted file mode 100644
index 66f78d2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-import java.util.ArrayDeque;
-
-/**
- * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
- * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition
- * timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
- */
-public class RecordQueue {
-
- private final SourceNode source;
- private final TopicPartition partition;
- private final ArrayDeque<StampedRecord> fifoQueue;
- private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
-
- private long partitionTime = TimestampTracker.NOT_KNOWN;
-
- public RecordQueue(TopicPartition partition, SourceNode source) {
- this.partition = partition;
- this.source = source;
-
- this.fifoQueue = new ArrayDeque<>();
- this.timeTracker = new MinTimestampTracker<>();
- }
-
- /**
- * Returns the corresponding source node in the topology
- *
- * @return SourceNode
- */
- public SourceNode source() {
- return source;
- }
-
- /**
- * Returns the partition with which this queue is associated
- *
- * @return TopicPartition
- */
- public TopicPartition partition() {
- return partition;
- }
-
- /**
- * Add a batch of {@link ConsumerRecord} into the queue
- *
- * @param rawRecords the raw records
- * @param timestampExtractor TimestampExtractor
- * @return the size of this queue
- */
- public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords, TimestampExtractor timestampExtractor) {
- for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
- // deserialize the raw record, extract the timestamp and put into the queue
- Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
- Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
-
- ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), key, value);
- long timestamp = timestampExtractor.extract(record);
-
- StampedRecord stampedRecord = new StampedRecord(record, timestamp);
-
- fifoQueue.addLast(stampedRecord);
- timeTracker.addElement(stampedRecord);
- }
-
- return size();
- }
-
- /**
- * Get the next {@link StampedRecord} from the queue
- *
- * @return StampedRecord
- */
- public StampedRecord poll() {
- StampedRecord elem = fifoQueue.pollFirst();
-
- if (elem == null)
- return null;
-
- timeTracker.removeElement(elem);
-
- // only advance the partition timestamp if its currently
- // tracked min timestamp has exceeded its value
- long timestamp = timeTracker.get();
-
- if (timestamp > partitionTime)
- partitionTime = timestamp;
-
- return elem;
- }
-
- /**
- * Returns the number of records in the queue
- *
- * @return the number of records
- */
- public int size() {
- return fifoQueue.size();
- }
-
- /**
- * Tests if the queue is empty
- *
- * @return true if the queue is empty, otherwise false
- */
- public boolean isEmpty() {
- return fifoQueue.isEmpty();
- }
-
- /**
- * Returns the tracked partition timestamp
- *
- * @return timestamp
- */
- public long timestamp() {
- return partitionTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
deleted file mode 100644
index 9f01727..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public class SinkNode<K, V> extends ProcessorNode<K, V> {
-
- private final String topic;
- private Serializer<K> keySerializer;
- private Serializer<V> valSerializer;
-
- private ProcessorContext context;
-
- public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
- super(name);
-
- this.topic = topic;
- this.keySerializer = keySerializer;
- this.valSerializer = valSerializer;
- }
-
- @Override
- public void addChild(ProcessorNode<?, ?> child) {
- throw new UnsupportedOperationException("sink node does not allow addChild");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(ProcessorContext context) {
- this.context = context;
- if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerializer();
- if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerializer();
- }
-
- @Override
- public void process(K key, V value) {
- // send to all the registered topics
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer);
- }
-
- @Override
- public void close() {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
deleted file mode 100644
index fa4afaf..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public class SourceNode<K, V> extends ProcessorNode<K, V> {
-
- private Deserializer<K> keyDeserializer;
- private Deserializer<V> valDeserializer;
- private ProcessorContext context;
-
- public SourceNode(String name, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
- super(name);
-
- this.keyDeserializer = keyDeserializer;
- this.valDeserializer = valDeserializer;
- }
-
- public K deserializeKey(String topic, byte[] data) {
- return keyDeserializer.deserialize(topic, data);
- }
-
- public V deserializeValue(String topic, byte[] data) {
- return valDeserializer.deserialize(topic, data);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(ProcessorContext context) {
- this.context = context;
-
- // if serializers are null, get the default ones from the context
- if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
- if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueDeserializer();
- }
-
- @Override
- public void process(K key, V value) {
- context.forward(key, value);
- }
-
- @Override
- public void close() {
- // do nothing
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
deleted file mode 100644
index 4e44667..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-public class Stamped<V> implements Comparable {
-
- public final V value;
- public final long timestamp;
-
- public Stamped(V value, long timestamp) {
- this.value = value;
- this.timestamp = timestamp;
- }
-
- @Override
- public int compareTo(Object other) {
- long otherTimestamp = ((Stamped<?>) other).timestamp;
-
- if (timestamp < otherTimestamp) return -1;
- else if (timestamp > otherTimestamp) return 1;
- return 0;
- }
-}