You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:32 UTC

[6/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
deleted file mode 100644
index 893f7de..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.QuickUnion;
-import org.apache.kafka.streams.processor.internals.SinkNode;
-import org.apache.kafka.streams.processor.internals.SourceNode;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A component that is used to build a {@link ProcessorTopology}. A topology contains an acyclic graph of sources, processors,
- * and sinks. A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to
- * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
- * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
- * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
- * that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
- */
-public class TopologyBuilder {
-
-    // node factories in a topological order
-    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
-
-    private final Set<String> sourceTopicNames = new HashSet<>();
-
-    private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
-    private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
-    private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
-    private Map<Integer, Set<String>> nodeGroups = null;
-
-    private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
-    private Map<String, Set<String>> stateStoreUsers = new HashMap();
-
-    private static abstract class NodeFactory {
-        public final String name;
-
-        NodeFactory(String name) {
-            this.name = name;
-        }
-
-        public abstract ProcessorNode build();
-    }
-
-    private static class ProcessorNodeFactory extends NodeFactory {
-        public final String[] parents;
-        private final ProcessorSupplier supplier;
-        private final Set<String> stateStoreNames = new HashSet<>();
-
-        public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
-            super(name);
-            this.parents = parents.clone();
-            this.supplier = supplier;
-        }
-
-        public void addStateStore(String stateStoreName) {
-            stateStoreNames.add(stateStoreName);
-        }
-
-        @Override
-        public ProcessorNode build() {
-            return new ProcessorNode(name, supplier.get(), stateStoreNames);
-        }
-    }
-
-    private static class SourceNodeFactory extends NodeFactory {
-        public final String[] topics;
-        private Deserializer keyDeserializer;
-        private Deserializer valDeserializer;
-
-        private SourceNodeFactory(String name, String[] topics, Deserializer keyDeserializer, Deserializer valDeserializer) {
-            super(name);
-            this.topics = topics.clone();
-            this.keyDeserializer = keyDeserializer;
-            this.valDeserializer = valDeserializer;
-        }
-
-        @Override
-        public ProcessorNode build() {
-            return new SourceNode(name, keyDeserializer, valDeserializer);
-        }
-    }
-
-    private static class SinkNodeFactory extends NodeFactory {
-        public final String[] parents;
-        public final String topic;
-        private Serializer keySerializer;
-        private Serializer valSerializer;
-
-        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
-            super(name);
-            this.parents = parents.clone();
-            this.topic = topic;
-            this.keySerializer = keySerializer;
-            this.valSerializer = valSerializer;
-        }
-        @Override
-        public ProcessorNode build() {
-            return new SinkNode(name, topic, keySerializer, valSerializer);
-        }
-    }
-
-    /**
-     * Create a new builder.
-     */
-    public TopologyBuilder() {}
-
-    /**
-     * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
-     * The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
-     * {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link StreamingConfig streaming configuration}.
-     *
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param topics the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder addSource(String name, String... topics) {
-        return addSource(name, (Deserializer) null, (Deserializer) null, topics);
-    }
-
-    /**
-     * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
-     * The sink will use the specified key and value deserializers.
-     *
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
-     * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
-     * {@link StreamingConfig streaming configuration}
-     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
-     * should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link StreamingConfig streaming configuration}
-     * @param topics the name of one or more Kafka topics that this source is to consume
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
-        if (nodeFactories.containsKey(name))
-            throw new TopologyException("Processor " + name + " is already added.");
-
-        for (String topic : topics) {
-            if (sourceTopicNames.contains(topic))
-                throw new TopologyException("Topic " + topic + " has already been registered by another source.");
-
-            sourceTopicNames.add(topic);
-        }
-
-        nodeFactories.put(name, new SourceNodeFactory(name, topics, keyDeserializer, valDeserializer));
-        nodeToTopics.put(name, topics.clone());
-        nodeGrouper.add(name);
-
-        return this;
-    }
-
-    /**
-     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
-     * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its messages
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
-        return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
-    }
-
-    /**
-     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
-     * The sink will use the specified key and value serializers.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should write its messages
-     * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
-     * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
-     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
-     * and write to its topic
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
-        if (nodeFactories.containsKey(name))
-            throw new TopologyException("Processor " + name + " is already added.");
-
-        if (parentNames != null) {
-            for (String parent : parentNames) {
-                if (parent.equals(name)) {
-                    throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
-                }
-                if (!nodeFactories.containsKey(parent)) {
-                    throw new TopologyException("Parent processor " + parent + " is not added yet.");
-                }
-            }
-        }
-
-        nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
-        return this;
-    }
-
-    /**
-     * Add a new processor node that receives and processes messages output by one or more parent source or processor node.
-     * Any new messages output by this processor will be forwarded to its child processor or sink nodes.
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link Processor} instance
-     * @param parentNames the name of one or more source or processor nodes whose output messages this processor should receive
-     * and process
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) {
-        if (nodeFactories.containsKey(name))
-            throw new TopologyException("Processor " + name + " is already added.");
-
-        if (parentNames != null) {
-            for (String parent : parentNames) {
-                if (parent.equals(name)) {
-                    throw new TopologyException("Processor " + name + " cannot be a parent of itself.");
-                }
-                if (!nodeFactories.containsKey(parent)) {
-                    throw new TopologyException("Parent processor " + parent + " is not added yet.");
-                }
-            }
-        }
-
-        nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
-        nodeGrouper.add(name);
-        nodeGrouper.unite(name, parentNames);
-        return this;
-    }
-
-    /**
-     * Adds a state store
-     *
-     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
-        if (stateStores.containsKey(supplier.name())) {
-            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
-        }
-        stateStores.put(supplier.name(), supplier);
-        stateStoreUsers.put(supplier.name(), new HashSet<String>());
-
-        if (processorNames != null) {
-            for (String processorName : processorNames) {
-                connectProcessorAndStateStore(processorName, supplier.name());
-            }
-        }
-
-        return this;
-    }
-
-    /**
-     * Connects the processor and the state stores
-     *
-     * @param processorName the name of the processor
-     * @param stateStoreNames the names of state stores that the processor uses
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) {
-        if (stateStoreNames != null) {
-            for (String stateStoreName : stateStoreNames) {
-                connectProcessorAndStateStore(processorName, stateStoreName);
-            }
-        }
-
-        return this;
-    }
-
-    private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
-        if (!stateStores.containsKey(stateStoreName))
-            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
-        if (!nodeFactories.containsKey(processorName))
-            throw new TopologyException("Processor " + processorName + " is not added yet.");
-
-        Set<String> users = stateStoreUsers.get(stateStoreName);
-        Iterator<String> iter = users.iterator();
-        if (iter.hasNext()) {
-            String user = iter.next();
-            nodeGrouper.unite(user, processorName);
-        }
-        users.add(processorName);
-
-        NodeFactory factory = nodeFactories.get(processorName);
-        if (factory instanceof ProcessorNodeFactory) {
-            ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
-        } else {
-            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
-        }
-    }
-
-    /**
-     * Returns the map of topic groups keyed by the group id.
-     * A topic group is a group of topics in the same task.
-     *
-     * @return groups of topic names
-     */
-    public Map<Integer, Set<String>> topicGroups() {
-        Map<Integer, Set<String>> topicGroups = new HashMap<>();
-
-        if (nodeGroups == null)
-            nodeGroups = makeNodeGroups();
-
-        for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
-            Set<String> topicGroup = new HashSet<>();
-            for (String node : entry.getValue()) {
-                String[] topics = nodeToTopics.get(node);
-                if (topics != null)
-                    topicGroup.addAll(Arrays.asList(topics));
-            }
-            topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
-        }
-
-        return Collections.unmodifiableMap(topicGroups);
-    }
-
-    /**
-     * Returns the map of node groups keyed by the topic group id.
-     *
-     * @return groups of node names
-     */
-    public Map<Integer, Set<String>> nodeGroups() {
-        if (nodeGroups == null)
-            nodeGroups = makeNodeGroups();
-
-        return nodeGroups;
-    }
-
-    private Map<Integer, Set<String>> makeNodeGroups() {
-        HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
-        HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
-
-        int nodeGroupId = 0;
-
-        // Go through source nodes first. This makes the group id assignment easy to predict in tests
-        for (String nodeName : Utils.sorted(nodeToTopics.keySet())) {
-            String root = nodeGrouper.root(nodeName);
-            Set<String> nodeGroup = rootToNodeGroup.get(root);
-            if (nodeGroup == null) {
-                nodeGroup = new HashSet<>();
-                rootToNodeGroup.put(root, nodeGroup);
-                nodeGroups.put(nodeGroupId++, nodeGroup);
-            }
-            nodeGroup.add(nodeName);
-        }
-
-        // Go through non-source nodes
-        for (String nodeName : Utils.sorted(nodeFactories.keySet())) {
-            if (!nodeToTopics.containsKey(nodeName)) {
-                String root = nodeGrouper.root(nodeName);
-                Set<String> nodeGroup = rootToNodeGroup.get(root);
-                if (nodeGroup == null) {
-                    nodeGroup = new HashSet<>();
-                    rootToNodeGroup.put(root, nodeGroup);
-                    nodeGroups.put(nodeGroupId++, nodeGroup);
-                }
-                nodeGroup.add(nodeName);
-            }
-        }
-
-        return nodeGroups;
-    }
-    
-    /**
-     * Asserts that the streams of the specified source nodes must be copartitioned.
-     *
-     * @param sourceNodes a set of source node names
-     * @return this builder instance so methods can be chained together; never null
-     */
-    public final TopologyBuilder copartitionSources(Collection<String> sourceNodes) {
-        copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
-        return this;
-    }
-
-    /**
-     * Returns the copartition groups.
-     * A copartition group is a group of topics that are required to be copartitioned.
-     *
-     * @return groups of topic names
-     */
-    public Collection<Set<String>> copartitionGroups() {
-        List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
-        for (Set<String> nodeNames : copartitionSourceGroups) {
-            Set<String> copartitionGroup = new HashSet<>();
-            for (String node : nodeNames) {
-                String[] topics = nodeToTopics.get(node);
-                if (topics != null)
-                    copartitionGroup.addAll(Arrays.asList(topics));
-            }
-            list.add(Collections.unmodifiableSet(copartitionGroup));
-        }
-        return Collections.unmodifiableList(list);
-    }
-
-    /**
-     * Build the topology for the specified topic group. This is called automatically when passing this builder into the
-     * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
-     *
-     * @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
-     */
-    public ProcessorTopology build(Integer topicGroupId) {
-        Set<String> nodeGroup;
-        if (topicGroupId != null) {
-            nodeGroup = nodeGroups().get(topicGroupId);
-        } else {
-            // when nodeGroup is null, we build the full topology. this is used in some tests.
-            nodeGroup = null;
-        }
-        return build(nodeGroup);
-    }
-
-    @SuppressWarnings("unchecked")
-    private ProcessorTopology build(Set<String> nodeGroup) {
-        List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
-        Map<String, ProcessorNode> processorMap = new HashMap<>();
-        Map<String, SourceNode> topicSourceMap = new HashMap<>();
-        Map<String, StateStoreSupplier> stateStoreMap = new HashMap<>();
-
-        try {
-            // create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
-            for (NodeFactory factory : nodeFactories.values()) {
-                if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-                    ProcessorNode node = factory.build();
-                    processorNodes.add(node);
-                    processorMap.put(node.name(), node);
-
-                    if (factory instanceof ProcessorNodeFactory) {
-                        for (String parent : ((ProcessorNodeFactory) factory).parents) {
-                            processorMap.get(parent).addChild(node);
-                        }
-                        for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
-                            if (!stateStoreMap.containsKey(stateStoreName)) {
-                                stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
-                            }
-                        }
-                    } else if (factory instanceof SourceNodeFactory) {
-                        for (String topic : ((SourceNodeFactory) factory).topics) {
-                            topicSourceMap.put(topic, (SourceNode) node);
-                        }
-                    } else if (factory instanceof SinkNodeFactory) {
-                        for (String parent : ((SinkNodeFactory) factory).parents) {
-                            processorMap.get(parent).addChild(node);
-                        }
-                    } else {
-                        throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
-                    }
-                }
-            }
-        } catch (Exception e) {
-            throw new KafkaException("ProcessorNode construction failed: this should not happen.");
-        }
-
-        return new ProcessorTopology(processorNodes, topicSourceMap, new ArrayList<>(stateStoreMap.values()));
-    }
-
-    /**
-     * Get the names of topics that are to be consumed by the source nodes created by this builder.
-     * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
-     */
-    public Set<String> sourceTopics() {
-        return Collections.unmodifiableSet(sourceTopicNames);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
deleted file mode 100644
index 99d1405..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.KafkaException;
-
-public class TopologyException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public TopologyException(String message) {
-        super(message);
-    }
-
-    public TopologyException(String name, Object value) {
-        this(name, value, null);
-    }
-
-    public TopologyException(String name, Object value, String message) {
-        super("Invalid topology building" + (message == null ? "" : ": " + message));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
deleted file mode 100644
index f7b14ad..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Configurable {
-
-    private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
-
-    private PartitionGrouper partitionGrouper;
-    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        Object o = configs.get(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE);
-        if (o == null)
-            throw new KafkaException("PartitionGrouper is not specified");
-
-        if (!PartitionGrouper.class.isInstance(o))
-            throw new KafkaException(o.getClass().getName() + " is not an instance of " + PartitionGrouper.class.getName());
-
-        partitionGrouper = (PartitionGrouper) o;
-        partitionGrouper.partitionAssignor(this);
-    }
-
-    @Override
-    public String name() {
-        return "streaming";
-    }
-
-    @Override
-    public Subscription subscription(Set<String> topics) {
-        return new Subscription(new ArrayList<>(topics));
-    }
-
-    @Override
-    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
-        Map<TaskId, Set<TopicPartition>> partitionGroups = partitionGrouper.partitionGroups(metadata);
-
-        String[] clientIds = subscriptions.keySet().toArray(new String[subscriptions.size()]);
-        TaskId[] taskIds = partitionGroups.keySet().toArray(new TaskId[partitionGroups.size()]);
-
-        Map<String, Assignment> assignment = new HashMap<>();
-
-        for (int i = 0; i < clientIds.length; i++) {
-            List<TopicPartition> partitions = new ArrayList<>();
-            List<TaskId> ids = new ArrayList<>();
-            for (int j = i; j < taskIds.length; j += clientIds.length) {
-                TaskId taskId = taskIds[j];
-                for (TopicPartition partition : partitionGroups.get(taskId)) {
-                    partitions.add(partition);
-                    ids.add(taskId);
-                }
-            }
-            ByteBuffer buf = ByteBuffer.allocate(4 + ids.size() * 8);
-            //version
-            buf.putInt(1);
-            // encode task ids
-            for (TaskId id : ids) {
-                buf.putInt(id.topicGroupId);
-                buf.putInt(id.partition);
-            }
-            buf.rewind();
-            assignment.put(clientIds[i], new Assignment(partitions, buf));
-        }
-
-        return assignment;
-    }
-
-    @Override
-    public void onAssignment(Assignment assignment) {
-        List<TopicPartition> partitions = assignment.partitions();
-        ByteBuffer data = assignment.userData();
-        data.rewind();
-
-        Map<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<>();
-
-        // check version
-        int version = data.getInt();
-        if (version == 1) {
-            for (TopicPartition partition : partitions) {
-                Set<TaskId> taskIds = partitionToTaskIds.get(partition);
-                if (taskIds == null) {
-                    taskIds = new HashSet<>();
-                    partitionToTaskIds.put(partition, taskIds);
-                }
-                // decode a task id
-                taskIds.add(new TaskId(data.getInt(), data.getInt()));
-            }
-        } else {
-            KafkaException ex = new KafkaException("unknown assignment data version: " + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
-        }
-        this.partitionToTaskIds = partitionToTaskIds;
-    }
-
-    public Set<TaskId> taskIds(TopicPartition partition) {
-        return partitionToTaskIds.get(partition);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
deleted file mode 100644
index 717df2c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import java.util.LinkedList;
-
-/**
- * MinTimestampTracker implements {@link TimestampTracker} that maintains the min
- * timestamp of the maintained stamped elements.
- */
-public class MinTimestampTracker<E> implements TimestampTracker<E> {
-
-    private final LinkedList<Stamped<E>> descendingSubsequence = new LinkedList<>();
-
-    // in the case that incoming traffic is very small, the records maybe put and polled
-    // within a single iteration, in this case we need to remember the last polled
-    // record's timestamp
-    private long lastKnownTime = NOT_KNOWN;
-
-    public void addElement(Stamped<E> elem) {
-        if (elem == null) throw new NullPointerException();
-
-        Stamped<E> minElem = descendingSubsequence.peekLast();
-        while (minElem != null && minElem.timestamp >= elem.timestamp) {
-            descendingSubsequence.removeLast();
-            minElem = descendingSubsequence.peekLast();
-        }
-        descendingSubsequence.offerLast(elem);
-    }
-
-    public void removeElement(Stamped<E> elem) {
-        if (elem != null && descendingSubsequence.peekFirst() == elem)
-            descendingSubsequence.removeFirst();
-
-        if (descendingSubsequence.isEmpty())
-            lastKnownTime = elem.timestamp;
-    }
-
-    public int size() {
-        return descendingSubsequence.size();
-    }
-
-    public long get() {
-        Stamped<E> stamped = descendingSubsequence.peekFirst();
-
-        if (stamped == null)
-            return lastKnownTime;
-        else
-            return stamped.timestamp;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
deleted file mode 100644
index d888085..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-/**
- * A PartitionGroup is composed from a set of partitions. It also maintains the timestamp of this
- * group, hence the associated task as the min timestamp across all partitions in the group.
- */
-public class PartitionGroup {
-
-    private final Map<TopicPartition, RecordQueue> partitionQueues;
-
-    private final PriorityQueue<RecordQueue> queuesByTime;
-
-    private final TimestampExtractor timestampExtractor;
-
-    public static class RecordInfo {
-        public RecordQueue queue;
-
-        public ProcessorNode node() {
-            return queue.source();
-        }
-
-        public TopicPartition partition() {
-            return queue.partition();
-        }
-    }
-
-    // since task is thread-safe, we do not need to synchronize on local variables
-    private int totalBuffered;
-
-    public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues, TimestampExtractor timestampExtractor) {
-        this.queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
-
-            @Override
-            public int compare(RecordQueue queue1, RecordQueue queue2) {
-                long time1 = queue1.timestamp();
-                long time2 = queue2.timestamp();
-
-                if (time1 < time2) return -1;
-                if (time1 > time2) return 1;
-                return 0;
-            }
-        });
-
-        this.partitionQueues = partitionQueues;
-
-        this.timestampExtractor = timestampExtractor;
-
-        this.totalBuffered = 0;
-    }
-
-    /**
-     * Get the next record and queue
-     *
-     * @return StampedRecord
-     */
-    public StampedRecord nextRecord(RecordInfo info) {
-        StampedRecord record = null;
-
-        RecordQueue queue = queuesByTime.poll();
-        if (queue != null) {
-            // get the first record from this queue.
-            record = queue.poll();
-
-            if (queue.size() > 0) {
-                queuesByTime.offer(queue);
-            }
-        }
-        info.queue = queue;
-
-        if (record != null) totalBuffered--;
-
-        return record;
-    }
-
-    /**
-     * Adds raw records to this partition group
-     *
-     * @param partition the partition
-     * @param rawRecords  the raw records
-     * @return the queue size for the partition
-     */
-    public int addRawRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
-        RecordQueue recordQueue = partitionQueues.get(partition);
-
-        int oldSize = recordQueue.size();
-        int newSize = recordQueue.addRawRecords(rawRecords, timestampExtractor);
-
-        // add this record queue to be considered for processing in the future if it was empty before
-        if (oldSize == 0 && newSize > 0) {
-            queuesByTime.offer(recordQueue);
-        }
-
-        totalBuffered += newSize - oldSize;
-
-        return newSize;
-    }
-
-    public Set<TopicPartition> partitions() {
-        return Collections.unmodifiableSet(partitionQueues.keySet());
-    }
-
-    /**
-     * Return the timestamp of this partition group as the smallest
-     * partition timestamp among all its partitions
-     */
-    public long timestamp() {
-        if (queuesByTime.isEmpty()) {
-            // if there is no data in all partitions, return the smallest of their last known times
-            long timestamp = Long.MAX_VALUE;
-            for (RecordQueue queue : partitionQueues.values()) {
-                if (timestamp > queue.timestamp())
-                    timestamp = queue.timestamp();
-            }
-            return timestamp;
-        } else {
-            return queuesByTime.peek().timestamp();
-        }
-    }
-
-    public int numBuffered(TopicPartition partition) {
-        RecordQueue recordQueue = partitionQueues.get(partition);
-
-        if (recordQueue == null)
-            throw new KafkaException("Record's partition does not belong to this partition-group.");
-
-        return recordQueue.size();
-    }
-
-    public int topQueueSize() {
-        RecordQueue recordQueue = queuesByTime.peek();
-        return (recordQueue == null) ? 0 : recordQueue.size();
-    }
-
-    public int numBuffered() {
-        return totalBuffered;
-    }
-
-    public void close() {
-        queuesByTime.clear();
-        partitionQueues.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
deleted file mode 100644
index 1321cc5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.TaskId;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
-
-    private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
-
-    private final TaskId id;
-    private final StreamTask task;
-    private final StreamingMetrics metrics;
-    private final RecordCollector collector;
-    private final ProcessorStateManager stateMgr;
-
-    private final Serializer<?> keySerializer;
-    private final Serializer<?> valSerializer;
-    private final Deserializer<?> keyDeserializer;
-    private final Deserializer<?> valDeserializer;
-
-    private boolean initialized;
-
-    @SuppressWarnings("unchecked")
-    public ProcessorContextImpl(TaskId id,
-                                StreamTask task,
-                                StreamingConfig config,
-                                RecordCollector collector,
-                                ProcessorStateManager stateMgr,
-                                StreamingMetrics metrics) {
-        this.id = id;
-        this.task = task;
-        this.metrics = metrics;
-        this.collector = collector;
-        this.stateMgr = stateMgr;
-
-        this.keySerializer = config.keySerializer();
-        this.valSerializer = config.valueSerializer();
-        this.keyDeserializer = config.keyDeserializer();
-        this.valDeserializer = config.valueDeserializer();
-
-        this.initialized = false;
-    }
-
-    public void initialized() {
-        this.initialized = true;
-    }
-
-    public TaskId id() {
-        return id;
-    }
-
-    public ProcessorStateManager getStateMgr() {
-        return stateMgr;
-    }
-
-    @Override
-    public RecordCollector recordCollector() {
-        return this.collector;
-    }
-
-    @Override
-    public Serializer<?> keySerializer() {
-        return this.keySerializer;
-    }
-
-    @Override
-    public Serializer<?> valueSerializer() {
-        return this.valSerializer;
-    }
-
-    @Override
-    public Deserializer<?> keyDeserializer() {
-        return this.keyDeserializer;
-    }
-
-    @Override
-    public Deserializer<?> valueDeserializer() {
-        return this.valDeserializer;
-    }
-
-    @Override
-    public File stateDir() {
-        return stateMgr.baseDir();
-    }
-
-    @Override
-    public StreamingMetrics metrics() {
-        return metrics;
-    }
-
-    @Override
-    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
-        if (initialized)
-            throw new KafkaException("Can only create state stores during initialization.");
-
-        stateMgr.register(store, stateRestoreCallback);
-    }
-
-    @Override
-    public StateStore getStateStore(String name) {
-        ProcessorNode node = task.node();
-
-        if (node == null)
-            throw new KafkaException("accessing from an unknown node");
-
-        if (!node.stateStores.contains(name))
-            throw new KafkaException("Processor " + node.name() + " has no access to StateStore " + name);
-
-        return stateMgr.getStore(name);
-    }
-
-    @Override
-    public String topic() {
-        if (task.record() == null)
-            throw new IllegalStateException("this should not happen as topic() should only be called while a record is processed");
-
-        return task.record().topic();
-    }
-
-    @Override
-    public int partition() {
-        if (task.record() == null)
-            throw new IllegalStateException("this should not happen as partition() should only be called while a record is processed");
-
-        return task.record().partition();
-    }
-
-    @Override
-    public long offset() {
-        if (this.task.record() == null)
-            throw new IllegalStateException("this should not happen as offset() should only be called while a record is processed");
-
-        return this.task.record().offset();
-    }
-
-    @Override
-    public long timestamp() {
-        if (task.record() == null)
-            throw new IllegalStateException("this should not happen as timestamp() should only be called while a record is processed");
-
-        return task.record().timestamp;
-    }
-
-    @Override
-    public <K, V> void forward(K key, V value) {
-        task.forward(key, value);
-    }
-
-    @Override
-    public <K, V> void forward(K key, V value, int childIndex) {
-        task.forward(key, value, childIndex);
-    }
-
-    @Override
-    public void commit() {
-        task.needCommit();
-    }
-
-    @Override
-    public void schedule(long interval) {
-        task.schedule(interval);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
deleted file mode 100644
index 6db83a1..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-public class ProcessorNode<K, V> {
-
-    private final List<ProcessorNode<?, ?>> children;
-
-    private final String name;
-    private final Processor<K, V> processor;
-
-    public final Set<String> stateStores;
-
-    public ProcessorNode(String name) {
-        this(name, null, null);
-    }
-
-    public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
-        this.name = name;
-        this.processor = processor;
-        this.children = new ArrayList<>();
-        this.stateStores = stateStores;
-    }
-
-    public final String name() {
-        return name;
-    }
-
-    public final Processor processor() {
-        return processor;
-    }
-
-    public final List<ProcessorNode<?, ?>> children() {
-        return children;
-    }
-
-    public void addChild(ProcessorNode<?, ?> child) {
-        children.add(child);
-    }
-
-    public void init(ProcessorContext context) {
-        processor.init(context);
-    }
-
-    public void process(K key, V value) {
-        processor.process(key, value);
-    }
-
-    public void close() {
-        processor.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
deleted file mode 100644
index 3cb9cea..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ProcessorStateManager {
-
-    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
-
-    public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
-    public static final String LOCK_FILE_NAME = ".lock";
-
-    private final int partition;
-    private final File baseDir;
-    private final FileLock directoryLock;
-    private final Map<String, StateStore> stores;
-    private final Consumer<byte[], byte[]> restoreConsumer;
-    private final Map<TopicPartition, Long> restoredOffsets;
-    private final Map<TopicPartition, Long> checkpointedOffsets;
-
-    public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException {
-        this.partition = partition;
-        this.baseDir = baseDir;
-        this.stores = new HashMap<>();
-        this.restoreConsumer = restoreConsumer;
-        this.restoredOffsets = new HashMap<>();
-
-        // create the state directory for this task if missing (we won't create the parent directory)
-        createStateDirectory(baseDir);
-
-        // try to acquire the exclusive lock on the state directory
-        directoryLock = lockStateDirectory(baseDir);
-        if (directoryLock == null) {
-            throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
-        }
-
-        // load the checkpoint information
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
-        this.checkpointedOffsets = new HashMap<>(checkpoint.read());
-
-        // delete the checkpoint file after finish loading its stored offsets
-        checkpoint.delete();
-    }
-
-    private static void createStateDirectory(File stateDir) throws IOException {
-        if (!stateDir.exists()) {
-            stateDir.mkdir();
-        }
-    }
-
-    public static FileLock lockStateDirectory(File stateDir) throws IOException {
-        File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
-        FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
-        try {
-            return channel.tryLock();
-        } catch (OverlappingFileLockException e) {
-            return null;
-        }
-    }
-
-    public File baseDir() {
-        return this.baseDir;
-    }
-
-    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
-        if (store.name().equals(CHECKPOINT_FILE_NAME))
-            throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
-
-        if (this.stores.containsKey(store.name()))
-            throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
-
-        // ---- register the store ---- //
-
-        // check that the underlying change log topic exist or not
-        if (restoreConsumer.listTopics().containsKey(store.name())) {
-            boolean partitionNotFound = true;
-            for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) {
-                if (partitionInfo.partition() == partition) {
-                    partitionNotFound = false;
-                    break;
-                }
-            }
-
-            if (partitionNotFound)
-                throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition " + partition);
-
-        } else {
-            throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet");
-        }
-
-        this.stores.put(store.name(), store);
-
-        // ---- try to restore the state from change-log ---- //
-
-        // subscribe to the store's partition
-        TopicPartition storePartition = new TopicPartition(store.name(), partition);
-        if (!restoreConsumer.subscription().isEmpty()) {
-            throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
-        }
-        restoreConsumer.assign(Collections.singletonList(storePartition));
-
-        // calculate the end offset of the partition
-        // TODO: this is a bit hacky to first seek then position to get the end offset
-        restoreConsumer.seekToEnd(storePartition);
-        long endOffset = restoreConsumer.position(storePartition);
-
-        // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
-        // restore the state from the beginning of the change log otherwise
-        if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
-            restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
-        } else {
-            restoreConsumer.seekToBeginning(storePartition);
-        }
-
-        // restore its state from changelog records; while restoring the log end offset
-        // should not change since it is only written by this thread.
-        while (true) {
-            for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
-                stateRestoreCallback.restore(record.key(), record.value());
-            }
-
-            if (restoreConsumer.position(storePartition) == endOffset) {
-                break;
-            } else if (restoreConsumer.position(storePartition) > endOffset) {
-                throw new IllegalStateException("Log end offset should not change while restoring");
-            }
-        }
-
-        // record the restored offset for its change log partition
-        long newOffset = restoreConsumer.position(storePartition);
-        restoredOffsets.put(storePartition, newOffset);
-
-        // un-assign the change log partition
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-    }
-
-    public StateStore getStore(String name) {
-        return stores.get(name);
-    }
-
-    public void cleanup() throws IOException {
-        // clean up any unknown files in the state directory
-        for (File file : this.baseDir.listFiles()) {
-            if (!this.stores.containsKey(file.getName())) {
-                log.info("Deleting state directory {}", file.getAbsolutePath());
-                file.delete();
-            }
-        }
-    }
-
-    public void flush() {
-        if (!this.stores.isEmpty()) {
-            log.debug("Flushing stores.");
-            for (StateStore store : this.stores.values())
-                store.flush();
-        }
-    }
-
-    public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
-        if (!stores.isEmpty()) {
-            log.debug("Closing stores.");
-            for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
-                log.debug("Closing storage engine {}", entry.getKey());
-                entry.getValue().flush();
-                entry.getValue().close();
-            }
-
-            Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
-            for (String storeName : stores.keySet()) {
-                TopicPartition part = new TopicPartition(storeName, partition);
-
-                // only checkpoint the offset to the offsets file if it is persistent;
-                if (stores.get(storeName).persistent()) {
-                    Long offset = ackedOffsets.get(part);
-
-                    if (offset == null) {
-                        // if no record was produced. we need to check the restored offset.
-                        offset = restoredOffsets.get(part);
-                    }
-
-                    if (offset != null) {
-                        // store the last offset + 1 (the log position after restoration)
-                        checkpointOffsets.put(part, offset + 1);
-                    }
-                }
-            }
-
-            // write the checkpoint file before closing, to indicate clean shutdown
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
-            checkpoint.write(checkpointOffsets);
-        }
-
-        // release the state directory directoryLock
-        directoryLock.release();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
deleted file mode 100644
index a70aa70..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class ProcessorTopology {
-
-    private final List<ProcessorNode> processorNodes;
-    private final Map<String, SourceNode> sourceByTopics;
-    private final List<StateStoreSupplier> stateStoreSuppliers;
-
-    public ProcessorTopology(List<ProcessorNode> processorNodes,
-                             Map<String, SourceNode> sourceByTopics,
-                             List<StateStoreSupplier> stateStoreSuppliers) {
-        this.processorNodes = Collections.unmodifiableList(processorNodes);
-        this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics);
-        this.stateStoreSuppliers = Collections.unmodifiableList(stateStoreSuppliers);
-    }
-
-    public Set<String> sourceTopics() {
-        return sourceByTopics.keySet();
-    }
-
-    public SourceNode source(String topic) {
-        return sourceByTopics.get(topic);
-    }
-
-    public Set<SourceNode> sources() {
-        return new HashSet<>(sourceByTopics.values());
-    }
-
-    public List<ProcessorNode> processors() {
-        return processorNodes;
-    }
-
-    public List<StateStoreSupplier> stateStoreSuppliers() {
-        return stateStoreSuppliers;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
deleted file mode 100644
index b4b7afe..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import java.util.PriorityQueue;
-
-public class PunctuationQueue {
-
-    private PriorityQueue<PunctuationSchedule> pq = new PriorityQueue<>();
-
-    public void schedule(PunctuationSchedule sched) {
-        synchronized (pq) {
-            pq.add(sched);
-        }
-    }
-
-    public void close() {
-        synchronized (pq) {
-            pq.clear();
-        }
-    }
-
-    public boolean mayPunctuate(long timestamp, Punctuator punctuator) {
-        synchronized (pq) {
-            boolean punctuated = false;
-            PunctuationSchedule top = pq.peek();
-            while (top != null && top.timestamp <= timestamp) {
-                PunctuationSchedule sched = top;
-                pq.poll();
-                punctuator.punctuate(sched.node(), timestamp);
-                pq.add(sched.next());
-                punctuated = true;
-
-                top = pq.peek();
-            }
-
-            return punctuated;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
deleted file mode 100644
index dc9a50d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-public class PunctuationSchedule extends Stamped<ProcessorNode> {
-
-    final long interval;
-
-    public PunctuationSchedule(ProcessorNode node, long interval) {
-        this(node, System.currentTimeMillis(), interval);
-    }
-
-    public PunctuationSchedule(ProcessorNode node, long time, long interval) {
-        super(node, time + interval);
-        this.interval = interval;
-    }
-
-    public ProcessorNode node() {
-        return value;
-    }
-
-    public PunctuationSchedule next() {
-        return new PunctuationSchedule(value, timestamp, interval);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
deleted file mode 100644
index d99e2ae..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Punctuator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-public interface Punctuator {
-
-    void punctuate(ProcessorNode node, long streamTime);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
deleted file mode 100644
index 087cbd2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import java.util.HashMap;
-import java.util.NoSuchElementException;
-
-public class QuickUnion<T> {
-
-    private HashMap<T, T> ids = new HashMap<>();
-
-    public void add(T id) {
-        ids.put(id, id);
-    }
-
-    public boolean exists(T id) {
-        return ids.containsKey(id);
-    }
-
-    public T root(T id) {
-        T current = id;
-        T parent = ids.get(current);
-
-        if (parent == null)
-            throw new NoSuchElementException("id: " + id.toString());
-
-        while (!parent.equals(current)) {
-            // do the path compression
-            T grandparent = ids.get(parent);
-            ids.put(current, grandparent);
-
-            current = parent;
-            parent = grandparent;
-        }
-        return current;
-    }
-
-    public void unite(T id1, T... idList) {
-        for (T id2 : idList) {
-            unitePair(id1, id2);
-        }
-    }
-
-    private void unitePair(T id1, T id2) {
-        T root1 = root(id1);
-        T root2 = root(id2);
-
-        if (!root1.equals(root2))
-            ids.put(root1, root2);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
deleted file mode 100644
index f0dbf35..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class RecordCollector {
-
-    /**
-     * A supplier of a {@link RecordCollector} instance.
-     */
-    public static interface Supplier {
-        /**
-         * Get the record collector.
-         * @return the record collector
-         */
-        public RecordCollector recordCollector();
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
-
-    private final Producer<byte[], byte[]> producer;
-    private final Map<TopicPartition, Long> offsets;
-    private final Callback callback = new Callback() {
-        public void onCompletion(RecordMetadata metadata, Exception exception) {
-            if (exception == null) {
-                TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                offsets.put(tp, metadata.offset());
-            } else {
-                log.error("Error sending record: ", exception);
-            }
-        }
-    };
-
-
-    public RecordCollector(Producer<byte[], byte[]> producer) {
-        this.producer = producer;
-        this.offsets = new HashMap<>();
-    }
-
-    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
-        byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
-        this.producer.send(new ProducerRecord<>(record.topic(), keyBytes, valBytes), callback);
-    }
-
-    public void flush() {
-        this.producer.flush();
-    }
-
-    /**
-     * Closes this RecordCollector
-     */
-    public void close() {
-        producer.close();
-    }
-
-    /**
-     * The last ack'd offset from the producer
-     *
-     * @return the map from TopicPartition to offset
-     */
-    Map<TopicPartition, Long> offsets() {
-        return this.offsets;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
deleted file mode 100644
index 66f78d2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-import java.util.ArrayDeque;
-
-/**
- * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
- * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition
- * timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
- */
-public class RecordQueue {
-
-    private final SourceNode source;
-    private final TopicPartition partition;
-    private final ArrayDeque<StampedRecord> fifoQueue;
-    private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
-
-    private long partitionTime = TimestampTracker.NOT_KNOWN;
-
-    public RecordQueue(TopicPartition partition, SourceNode source) {
-        this.partition = partition;
-        this.source = source;
-
-        this.fifoQueue = new ArrayDeque<>();
-        this.timeTracker = new MinTimestampTracker<>();
-    }
-
-    /**
-     * Returns the corresponding source node in the topology
-     *
-     * @return SourceNode
-     */
-    public SourceNode source() {
-        return source;
-    }
-
-    /**
-     * Returns the partition with which this queue is associated
-     *
-     * @return TopicPartition
-     */
-    public TopicPartition partition() {
-        return partition;
-    }
-
-    /**
-     * Add a batch of {@link ConsumerRecord} into the queue
-     *
-     * @param rawRecords the raw records
-     * @param timestampExtractor TimestampExtractor
-     * @return the size of this queue
-     */
-    public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords, TimestampExtractor timestampExtractor) {
-        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
-            // deserialize the raw record, extract the timestamp and put into the queue
-            Object key = source.deserializeKey(rawRecord.topic(), rawRecord.key());
-            Object value = source.deserializeValue(rawRecord.topic(), rawRecord.value());
-
-            ConsumerRecord<Object, Object> record = new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), key, value);
-            long timestamp = timestampExtractor.extract(record);
-
-            StampedRecord stampedRecord = new StampedRecord(record, timestamp);
-
-            fifoQueue.addLast(stampedRecord);
-            timeTracker.addElement(stampedRecord);
-        }
-
-        return size();
-    }
-
-    /**
-     * Get the next {@link StampedRecord} from the queue
-     *
-     * @return StampedRecord
-     */
-    public StampedRecord poll() {
-        StampedRecord elem = fifoQueue.pollFirst();
-
-        if (elem == null)
-            return null;
-
-        timeTracker.removeElement(elem);
-
-        // only advance the partition timestamp if its currently
-        // tracked min timestamp has exceeded its value
-        long timestamp = timeTracker.get();
-
-        if (timestamp > partitionTime)
-            partitionTime = timestamp;
-
-        return elem;
-    }
-
-    /**
-     * Returns the number of records in the queue
-     *
-     * @return the number of records
-     */
-    public int size() {
-        return fifoQueue.size();
-    }
-
-    /**
-     * Tests if the queue is empty
-     *
-     * @return true if the queue is empty, otherwise false
-     */
-    public boolean isEmpty() {
-        return fifoQueue.isEmpty();
-    }
-
-    /**
-     * Returns the tracked partition timestamp
-     *
-     * @return timestamp
-     */
-    public long timestamp() {
-        return partitionTime;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
deleted file mode 100644
index 9f01727..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public class SinkNode<K, V> extends ProcessorNode<K, V> {
-
-    private final String topic;
-    private Serializer<K> keySerializer;
-    private Serializer<V> valSerializer;
-
-    private ProcessorContext context;
-
-    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
-        super(name);
-
-        this.topic = topic;
-        this.keySerializer = keySerializer;
-        this.valSerializer = valSerializer;
-    }
-
-    @Override
-    public void addChild(ProcessorNode<?, ?> child) {
-        throw new UnsupportedOperationException("sink node does not allow addChild");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(ProcessorContext context) {
-        this.context = context;
-        if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerializer();
-        if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerializer();
-    }
-
-    @Override
-    public void process(K key, V value) {
-        // send to all the registered topics
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer);
-    }
-
-    @Override
-    public void close() {
-        // do nothing
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
deleted file mode 100644
index fa4afaf..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public class SourceNode<K, V> extends ProcessorNode<K, V> {
-
-    private Deserializer<K> keyDeserializer;
-    private Deserializer<V> valDeserializer;
-    private ProcessorContext context;
-
-    public SourceNode(String name, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
-        super(name);
-
-        this.keyDeserializer = keyDeserializer;
-        this.valDeserializer = valDeserializer;
-    }
-
-    public K deserializeKey(String topic, byte[] data) {
-        return keyDeserializer.deserialize(topic, data);
-    }
-
-    public V deserializeValue(String topic, byte[] data) {
-        return valDeserializer.deserialize(topic, data);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(ProcessorContext context) {
-        this.context = context;
-
-        // if serializers are null, get the default ones from the context
-        if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keyDeserializer();
-        if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueDeserializer();
-    }
-
-    @Override
-    public void process(K key, V value) {
-        context.forward(key, value);
-    }
-
-    @Override
-    public void close() {
-        // do nothing
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
deleted file mode 100644
index 4e44667..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Stamped.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-public class Stamped<V> implements Comparable {
-
-    public final V value;
-    public final long timestamp;
-
-    public Stamped(V value, long timestamp) {
-        this.value = value;
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public int compareTo(Object other) {
-        long otherTimestamp = ((Stamped<?>) other).timestamp;
-
-        if (timestamp < otherTimestamp) return -1;
-        else if (timestamp > otherTimestamp) return 1;
-        return 0;
-    }
-}