You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:31 UTC

[5/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
deleted file mode 100644
index febd938..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
-
-    public StampedRecord(ConsumerRecord<Object, Object> record, long timestamp) {
-        super(record, timestamp);
-    }
-
-    public String topic() {
-        return value.topic();
-    }
-
-    public int partition() {
-        return value.partition();
-    }
-
-    public Object key() {
-        return value.key();
-    }
-
-    public Object value() {
-        return value.value();
-    }
-
-    public long offset() {
-        return value.offset();
-    }
-
-    @Override
-    public String toString() {
-        return value.toString() + ", timestamp = " + timestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
deleted file mode 100644
index a9c14e5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
- */
-public class StreamTask implements Punctuator {
-
-    private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
-
-    private final TaskId id;
-    private final int maxBufferedSize;
-
-    private final Consumer consumer;
-    private final PartitionGroup partitionGroup;
-    private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
-    private final PunctuationQueue punctuationQueue;
-    private final ProcessorContextImpl processorContext;
-    private final ProcessorTopology topology;
-
-    private final Map<TopicPartition, Long> consumedOffsets;
-    private final RecordCollector recordCollector;
-    private final ProcessorStateManager stateMgr;
-
-    private boolean commitRequested = false;
-    private boolean commitOffsetNeeded = false;
-    private StampedRecord currRecord = null;
-    private ProcessorNode currNode = null;
-
-    private boolean requiresPoll = true;
-
-    /**
-     * Create {@link StreamTask} with its assigned partitions
-     *
-     * @param id                    the ID of this task
-     * @param consumer              the instance of {@link Consumer}
-     * @param producer              the instance of {@link Producer}
-     * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
-     * @param partitions            the collection of assigned {@link TopicPartition}
-     * @param topology              the instance of {@link ProcessorTopology}
-     * @param config                the {@link StreamingConfig} specified by the user
-     * @param metrics               the {@link StreamingMetrics} created by the thread
-     */
-    public StreamTask(TaskId id,
-                      Consumer<byte[], byte[]> consumer,
-                      Producer<byte[], byte[]> producer,
-                      Consumer<byte[], byte[]> restoreConsumer,
-                      Collection<TopicPartition> partitions,
-                      ProcessorTopology topology,
-                      StreamingConfig config,
-                      StreamingMetrics metrics) {
-
-        this.id = id;
-        this.consumer = consumer;
-        this.punctuationQueue = new PunctuationQueue();
-        this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-        this.topology = topology;
-
-        // create queues for each assigned partition and associate them
-        // to corresponding source nodes in the processor topology
-        Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
-
-        for (TopicPartition partition : partitions) {
-            SourceNode source = topology.source(partition.topic());
-            RecordQueue queue = createRecordQueue(partition, source);
-            partitionQueues.put(partition, queue);
-        }
-
-        TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
-        this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
-
-        // initialize the consumed and produced offset cache
-        this.consumedOffsets = new HashMap<>();
-
-        // create the record recordCollector that maintains the produced offsets
-        this.recordCollector = new RecordCollector(producer);
-
-        log.info("Creating restoration consumer client for stream task #" + id());
-
-        // create the processor state manager
-        try {
-            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
-            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer);
-        } catch (IOException e) {
-            throw new KafkaException("Error while creating the state manager", e);
-        }
-
-        // initialize the topology with its own context
-        this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
-
-        // initialize the state stores
-        for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
-            StateStore store = stateStoreSupplier.get();
-            store.init(this.processorContext);
-        }
-
-        // initialize the task by initializing all its processor nodes in the topology
-        for (ProcessorNode node : this.topology.processors()) {
-            this.currNode = node;
-            try {
-                node.init(this.processorContext);
-            } finally {
-                this.currNode = null;
-            }
-        }
-
-        this.processorContext.initialized();
-    }
-
-    public TaskId id() {
-        return id;
-    }
-
-    public Set<TopicPartition> partitions() {
-        return this.partitionGroup.partitions();
-    }
-
-    /**
-     * Adds records to queues
-     *
-     * @param partition the partition
-     * @param records  the records
-     */
-    @SuppressWarnings("unchecked")
-    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
-        int queueSize = partitionGroup.addRawRecords(partition, records);
-
-        // if after adding these records, its partition queue's buffered size has been
-        // increased beyond the threshold, we can then pause the consumption for this partition
-        if (queueSize > this.maxBufferedSize) {
-            consumer.pause(partition);
-        }
-    }
-
-    /**
-     * Process one record
-     *
-     * @return number of records left in the buffer of this task's partition group after the processing is done
-     */
-    @SuppressWarnings("unchecked")
-    public int process() {
-        synchronized (this) {
-            // get the next record to process
-            StampedRecord record = partitionGroup.nextRecord(recordInfo);
-
-            // if there is no record to process, return immediately
-            if (record == null) {
-                requiresPoll = true;
-                return 0;
-            }
-
-            requiresPoll = false;
-
-            try {
-                // process the record by passing to the source node of the topology
-                this.currRecord = record;
-                this.currNode = recordInfo.node();
-                TopicPartition partition = recordInfo.partition();
-
-                log.debug("Start processing one record [" + currRecord + "]");
-
-                this.currNode.process(currRecord.key(), currRecord.value());
-
-                log.debug("Completed processing one record [" + currRecord + "]");
-
-                // update the consumed offset map after processing is done
-                consumedOffsets.put(partition, currRecord.offset());
-                commitOffsetNeeded = true;
-
-                // after processing this record, if its partition queue's buffered size has been
-                // decreased to the threshold, we can then resume the consumption on this partition
-                if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
-                    consumer.resume(partition);
-                    requiresPoll = true;
-                }
-
-                if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
-                    requiresPoll = true;
-                }
-            } finally {
-                this.currRecord = null;
-                this.currNode = null;
-            }
-
-            return partitionGroup.numBuffered();
-        }
-    }
-
-    public boolean requiresPoll() {
-        return requiresPoll;
-    }
-
-    /**
-     * Possibly trigger registered punctuation functions if
-     * current time has reached the defined stamp
-     *
-     * @param timestamp
-     */
-    public boolean maybePunctuate(long timestamp) {
-        return punctuationQueue.mayPunctuate(timestamp, this);
-    }
-
-    @Override
-    public void punctuate(ProcessorNode node, long timestamp) {
-        if (currNode != null)
-            throw new IllegalStateException("Current node is not null");
-
-        currNode = node;
-        try {
-            node.processor().punctuate(timestamp);
-        } finally {
-            currNode = null;
-        }
-    }
-
-    public StampedRecord record() {
-        return this.currRecord;
-    }
-
-    public ProcessorNode node() {
-        return this.currNode;
-    }
-
-    public ProcessorTopology topology() {
-        return this.topology;
-    }
-
-    /**
-     * Commit the current task state
-     */
-    public void commit() {
-        // 1) flush local state
-        stateMgr.flush();
-
-        // 2) flush produced records in the downstream and change logs of local states
-        recordCollector.flush();
-
-        // 3) commit consumed offsets if it is dirty already
-        if (commitOffsetNeeded) {
-            Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
-            for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
-                consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L));
-            }
-            consumer.commitSync(consumedOffsetsAndMetadata);
-            commitOffsetNeeded = false;
-        }
-
-        commitRequested = false;
-    }
-
-    /**
-     * Whether or not a request has been made to commit the current state
-     */
-    public boolean commitNeeded() {
-        return this.commitRequested;
-    }
-
-    /**
-     * Request committing the current task's state
-     */
-    public void needCommit() {
-        this.commitRequested = true;
-    }
-
-    /**
-     * Schedules a punctuation for the processor
-     *
-     * @param interval  the interval in milliseconds
-     */
-    public void schedule(long interval) {
-        if (currNode == null)
-            throw new IllegalStateException("Current node is null");
-
-        punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
-    }
-
-    public void close() {
-        this.partitionGroup.close();
-        this.consumedOffsets.clear();
-
-        // close the processors
-        // make sure close() is called for each node even when there is a RuntimeException
-        RuntimeException exception = null;
-        for (ProcessorNode node : this.topology.processors()) {
-            currNode = node;
-            try {
-                node.close();
-            } catch (RuntimeException e) {
-                exception = e;
-            } finally {
-                currNode = null;
-            }
-        }
-
-        if (exception != null)
-            throw exception;
-
-        try {
-            stateMgr.close(recordCollector.offsets());
-        } catch (IOException e) {
-            throw new KafkaException("Error while closing the state manager in processor context", e);
-        }
-    }
-
-    private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
-        return new RecordQueue(partition, source);
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value) {
-        ProcessorNode thisNode = currNode;
-        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            currNode = childNode;
-            try {
-                childNode.process(key, value);
-            } finally {
-                currNode = thisNode;
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public <K, V> void forward(K key, V value, int childIndex) {
-        ProcessorNode thisNode = currNode;
-        ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
-        currNode = childNode;
-        try {
-            childNode.process(key, value);
-        } finally {
-            currNode = thisNode;
-        }
-    }
-
-    public ProcessorContext context() {
-        return processorContext;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
deleted file mode 100644
index ba81421..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.MeasurableStat;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class StreamThread extends Thread {
-
-    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
-    private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
-
-    private final AtomicBoolean running;
-
-    protected final StreamingConfig config;
-    protected final TopologyBuilder builder;
-    protected final PartitionGrouper partitionGrouper;
-    protected final Producer<byte[], byte[]> producer;
-    protected final Consumer<byte[], byte[]> consumer;
-    protected final Consumer<byte[], byte[]> restoreConsumer;
-
-    private final Map<TaskId, StreamTask> tasks;
-    private final String clientId;
-    private final Time time;
-    private final File stateDir;
-    private final long pollTimeMs;
-    private final long cleanTimeMs;
-    private final long commitTimeMs;
-    private final long totalRecordsToProcess;
-    private final StreamingMetricsImpl sensors;
-
-    private long lastClean;
-    private long lastCommit;
-    private long recordsProcessed;
-
-    final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
-            addPartitions(assignment);
-            lastClean = time.milliseconds(); // start the cleaning cycle
-        }
-
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
-            commitAll();
-            removePartitions();
-            lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
-        }
-    };
-
-    public StreamThread(TopologyBuilder builder,
-                        StreamingConfig config,
-                        String clientId,
-                        Metrics metrics,
-                        Time time) throws Exception {
-        this(builder, config, null , null, null, clientId, metrics, time);
-    }
-
-    StreamThread(TopologyBuilder builder,
-                 StreamingConfig config,
-                 Producer<byte[], byte[]> producer,
-                 Consumer<byte[], byte[]> consumer,
-                 Consumer<byte[], byte[]> restoreConsumer,
-                 String clientId,
-                 Metrics metrics,
-                 Time time) throws Exception {
-        super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
-
-        this.config = config;
-        this.builder = builder;
-        this.clientId = clientId;
-        this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
-        this.partitionGrouper.topicGroups(builder.topicGroups());
-
-        // set the producer and consumer clients
-        this.producer = (producer != null) ? producer : createProducer();
-        this.consumer = (consumer != null) ? consumer : createConsumer();
-        this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer();
-
-        // initialize the task list
-        this.tasks = new HashMap<>();
-
-        // read in task specific config values
-        this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
-        this.stateDir.mkdir();
-        this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG);
-        this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG);
-        this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
-        this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS);
-
-        this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
-        this.lastCommit = time.milliseconds();
-        this.recordsProcessed = 0;
-        this.time = time;
-
-        this.sensors = new StreamingMetricsImpl(metrics);
-
-        this.running = new AtomicBoolean(true);
-    }
-
-    private Producer<byte[], byte[]> createProducer() {
-        log.info("Creating producer client for stream thread [" + this.getName() + "]");
-        return new KafkaProducer<>(config.getProducerConfigs(),
-                new ByteArraySerializer(),
-                new ByteArraySerializer());
-    }
-
-    private Consumer<byte[], byte[]> createConsumer() {
-        log.info("Creating consumer client for stream thread [" + this.getName() + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(partitionGrouper),
-                new ByteArrayDeserializer(),
-                new ByteArrayDeserializer());
-    }
-
-    private Consumer<byte[], byte[]> createRestoreConsumer() {
-        log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(),
-                new ByteArrayDeserializer(),
-                new ByteArrayDeserializer());
-    }
-
-    /**
-     * Execute the stream processors
-     */
-    @Override
-    public void run() {
-        log.info("Starting stream thread [" + this.getName() + "]");
-
-        try {
-            runLoop();
-        } catch (RuntimeException e) {
-            log.error("Uncaught error during processing in thread [" + this.getName() + "]: ", e);
-            throw e;
-        } finally {
-            shutdown();
-        }
-    }
-
-    /**
-     * Shutdown this streaming thread.
-     */
-    public void close() {
-        running.set(false);
-    }
-
-    public Map<TaskId, StreamTask> tasks() {
-        return Collections.unmodifiableMap(tasks);
-    }
-
-    private void shutdown() {
-        log.info("Shutting down stream thread [" + this.getName() + "]");
-
-        // Exceptions should not prevent this call from going through all shutdown steps.
-        try {
-            commitAll();
-        } catch (Throwable e) {
-            // already logged in commitAll()
-        }
-        try {
-            producer.close();
-        } catch (Throwable e) {
-            log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
-        }
-        try {
-            consumer.close();
-        } catch (Throwable e) {
-            log.error("Failed to close consumer in thread [" + this.getName() + "]: ", e);
-        }
-        try {
-            restoreConsumer.close();
-        } catch (Throwable e) {
-            log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
-        }
-        try {
-            removePartitions();
-        } catch (Throwable e) {
-            // already logged in removePartition()
-        }
-
-        log.info("Stream thread shutdown complete [" + this.getName() + "]");
-    }
-
-    private void runLoop() {
-        try {
-            int totalNumBuffered = 0;
-            boolean requiresPoll = true;
-
-            ensureCopartitioning(builder.copartitionGroups());
-
-            consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
-
-            while (stillRunning()) {
-                // try to fetch some records if necessary
-                if (requiresPoll) {
-                    long startPoll = time.milliseconds();
-
-                    ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
-
-                    if (!records.isEmpty()) {
-                        for (StreamTask task : tasks.values()) {
-                            for (TopicPartition partition : task.partitions()) {
-                                task.addRecords(partition, records.records(partition));
-                            }
-                        }
-                    }
-
-                    long endPoll = time.milliseconds();
-                    sensors.pollTimeSensor.record(endPoll - startPoll);
-                }
-
-                totalNumBuffered = 0;
-
-                if (!tasks.isEmpty()) {
-                    // try to process one record from each task
-                    requiresPoll = false;
-
-                    for (StreamTask task : tasks.values()) {
-                        long startProcess = time.milliseconds();
-
-                        totalNumBuffered += task.process();
-                        requiresPoll = requiresPoll || task.requiresPoll();
-
-                        sensors.processTimeSensor.record(time.milliseconds() - startProcess);
-                    }
-
-                    maybePunctuate();
-                    maybeCommit();
-                } else {
-                    // even when no task is assigned, we must poll to get a task.
-                    requiresPoll = true;
-                }
-
-                maybeClean();
-            }
-        } catch (Exception e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    private boolean stillRunning() {
-        if (!running.get()) {
-            log.debug("Shutting down at user request.");
-            return false;
-        }
-
-        if (totalRecordsToProcess >= 0 && recordsProcessed >= totalRecordsToProcess) {
-            log.debug("Shutting down as we've reached the user configured limit of {} records to process.", totalRecordsToProcess);
-            return false;
-        }
-
-        return true;
-    }
-
-    private void maybePunctuate() {
-        for (StreamTask task : tasks.values()) {
-            try {
-                long now = time.milliseconds();
-
-                if (task.maybePunctuate(now))
-                    sensors.punctuateTimeSensor.record(time.milliseconds() - now);
-
-            } catch (Exception e) {
-                log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
-        }
-    }
-
-    protected void maybeCommit() {
-        long now = time.milliseconds();
-
-        if (commitTimeMs >= 0 && lastCommit + commitTimeMs < now) {
-            log.trace("Committing processor instances because the commit interval has elapsed.");
-
-            commitAll();
-            lastCommit = now;
-        } else {
-            for (StreamTask task : tasks.values()) {
-                try {
-                    if (task.commitNeeded())
-                        commitOne(task, time.milliseconds());
-                } catch (Exception e) {
-                    log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                    throw e;
-                }
-            }
-        }
-    }
-
-    /**
-     * Commit the states of all its tasks
-     */
-    private void commitAll() {
-        for (StreamTask task : tasks.values()) {
-            try {
-                commitOne(task, time.milliseconds());
-            } catch (Exception e) {
-                log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
-        }
-    }
-
-    /**
-     * Commit the state of a task
-     */
-    private void commitOne(StreamTask task, long now) {
-        try {
-            task.commit();
-        } catch (Exception e) {
-            log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-            throw e;
-        }
-
-        sensors.commitTimeSensor.record(time.milliseconds() - now);
-    }
-
-    /**
-     * Cleanup any states of the tasks that have been removed from this thread
-     */
-    protected void maybeClean() {
-        long now = time.milliseconds();
-
-        if (now > lastClean + cleanTimeMs) {
-            File[] stateDirs = stateDir.listFiles();
-            if (stateDirs != null) {
-                for (File dir : stateDirs) {
-                    try {
-                        TaskId id = TaskId.parse(dir.getName());
-
-                        // try to acquire the exclusive lock on the state directory
-                        FileLock directoryLock = null;
-                        try {
-                            directoryLock = ProcessorStateManager.lockStateDirectory(dir);
-                            if (directoryLock != null) {
-                                log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs);
-                                Utils.delete(dir);
-                            }
-                        } catch (IOException e) {
-                            log.error("Failed to lock the state directory due to an unexpected exception", e);
-                        } finally {
-                            if (directoryLock != null) {
-                                try {
-                                    directoryLock.release();
-                                } catch (IOException e) {
-                                    log.error("Failed to release the state directory lock");
-                                }
-                            }
-                        }
-                    } catch (TaskId.TaskIdFormatException e) {
-                        // there may be some unknown files that sits in the same directory,
-                        // we should ignore these files instead trying to delete them as well
-                    }
-                }
-            }
-
-            lastClean = now;
-        }
-    }
-
-    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
-        sensors.taskCreationSensor.record();
-
-        ProcessorTopology topology = builder.build(id.topicGroupId);
-
-        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
-    }
-
-    private void addPartitions(Collection<TopicPartition> assignment) {
-
-        HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
-
-        for (TopicPartition partition : assignment) {
-            Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
-            for (TaskId taskId : taskIds) {
-                Set<TopicPartition> partitions = partitionsForTask.get(taskId);
-                if (partitions == null) {
-                    partitions = new HashSet<>();
-                    partitionsForTask.put(taskId, partitions);
-                }
-                partitions.add(partition);
-            }
-        }
-
-        // create the tasks
-        for (TaskId taskId : partitionsForTask.keySet()) {
-            try {
-                tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
-            } catch (Exception e) {
-                log.error("Failed to create a task #" + taskId + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
-        }
-
-        lastClean = time.milliseconds();
-    }
-
-    private void removePartitions() {
-
-        // TODO: change this clearing tasks behavior
-        for (StreamTask task : tasks.values()) {
-            log.info("Removing task {}", task.id());
-            try {
-                task.close();
-            } catch (Exception e) {
-                log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
-            sensors.taskDestructionSensor.record();
-        }
-        tasks.clear();
-    }
-
-    public PartitionGrouper partitionGrouper() {
-        return partitionGrouper;
-    }
-
-    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {
-        for (Set<String> copartitionGroup : copartitionGroups) {
-            ensureCopartitioning(copartitionGroup);
-        }
-    }
-
-    private void ensureCopartitioning(Set<String> copartitionGroup) {
-        int numPartitions = -1;
-
-        for (String topic : copartitionGroup) {
-            List<PartitionInfo> infos = consumer.partitionsFor(topic);
-
-            if (infos == null)
-                throw new KafkaException("topic not found: " + topic);
-
-            if (numPartitions == -1) {
-                numPartitions = infos.size();
-            } else if (numPartitions != infos.size()) {
-                String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
-                Arrays.sort(topics);
-                throw new KafkaException("topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
-            }
-        }
-    }
-
-    private class StreamingMetricsImpl implements StreamingMetrics {
-        final Metrics metrics;
-        final String metricGrpName;
-        final Map<String, String> metricTags;
-
-        final Sensor commitTimeSensor;
-        final Sensor pollTimeSensor;
-        final Sensor processTimeSensor;
-        final Sensor punctuateTimeSensor;
-        final Sensor taskCreationSensor;
-        final Sensor taskDestructionSensor;
-
-        public StreamingMetricsImpl(Metrics metrics) {
-
-            this.metrics = metrics;
-            this.metricGrpName = "streaming-metrics";
-            this.metricTags = new LinkedHashMap<>();
-            this.metricTags.put("client-id", clientId + "-" + getName());
-
-            this.commitTimeSensor = metrics.sensor("commit-time");
-            this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
-            this.commitTimeSensor.add(new MetricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
-            this.commitTimeSensor.add(new MetricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
-
-            this.pollTimeSensor = metrics.sensor("poll-time");
-            this.pollTimeSensor.add(new MetricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
-            this.pollTimeSensor.add(new MetricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
-            this.pollTimeSensor.add(new MetricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
-
-            this.processTimeSensor = metrics.sensor("process-time");
-            this.processTimeSensor.add(new MetricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
-            this.processTimeSensor.add(new MetricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
-            this.processTimeSensor.add(new MetricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
-
-            this.punctuateTimeSensor = metrics.sensor("punctuate-time");
-            this.punctuateTimeSensor.add(new MetricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
-            this.punctuateTimeSensor.add(new MetricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
-            this.punctuateTimeSensor.add(new MetricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
-
-            this.taskCreationSensor = metrics.sensor("task-creation");
-            this.taskCreationSensor.add(new MetricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
-
-            this.taskDestructionSensor = metrics.sensor("task-destruction");
-            this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
-        }
-
-        @Override
-        public void recordLatency(Sensor sensor, long startNs, long endNs) {
-            sensor.record((endNs - startNs) / 1000000, endNs);
-        }
-
-        @Override
-        public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
-            // extract the additional tags if there are any
-            Map<String, String> tagMap = new HashMap<>(this.metricTags);
-            if ((tags.length % 2) != 0)
-                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
-
-            for (int i = 0; i < tags.length; i += 2)
-                tagMap.put(tags[i], tags[i + 1]);
-
-            // first add the global operation metrics if not yet, with the global tags only
-            Sensor parent = metrics.sensor(operationName);
-            addLatencyMetrics(this.metricGrpName, parent, "all", operationName, this.metricTags);
-
-            // add the store operation metrics with additional tags
-            Sensor sensor = metrics.sensor(entityName + "-" + operationName, parent);
-            addLatencyMetrics("streaming-" + scopeName + "-metrics", sensor, entityName, operationName, tagMap);
-
-            return sensor;
-        }
-
-        private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
-            maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName,
-                "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
-            maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName,
-                "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
-            maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName,
-                "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
-        }
-
-        private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
-            if (!metrics.metrics().containsKey(name))
-                sensor.add(name, stat);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
deleted file mode 100644
index d8a012a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-/**
- * TimestampTracker is a helper class for a sliding window implementation.
- * It is assumed that stamped elements are added or removed in a FIFO manner.
- * It maintains the timestamp, such as the min timestamp, the max timestamp, etc.
- * of stamped elements that were added but not yet removed.
- */
-public interface TimestampTracker<E> {
-
-    static final long NOT_KNOWN = -1L;
-
-    /**
-     * Adds a stamped elements to this tracker.
-     *
-     * @param elem the added element
-     */
-    void addElement(Stamped<E> elem);
-
-    /**
-     * Removed a stamped elements to this tracker.
-     *
-     * @param elem the removed element
-     */
-    void removeElement(Stamped<E> elem);
-
-    /**
-     * Returns the current tracked timestamp
-     *
-     * @return timestamp, or {@link #NOT_KNOWN} when empty
-     */
-    long get();
-
-    /**
-     * Returns the size of internal structure. The meaning of "size" depends on the implementation.
-     *
-     * @return size
-     */
-    int size();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
deleted file mode 100644
index 183b691..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-public class Entry<K, V> {
-
-    private final K key;
-    private final V value;
-
-    public Entry(K key, V value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public K key() {
-        return key;
-    }
-
-    public V value() {
-        return value;
-    }
-
-    public String toString() {
-        return "Entry(" + key() + ", " + value() + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index d1f845c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
- */
-public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final Serdes serdes;
-    private final Time time;
-
-    protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
-        this.name = name;
-        this.serdes = serdes;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
-    }
-
-    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final NavigableMap<K, V> map;
-
-        public MemoryStore(String name) {
-            super();
-            this.name = name;
-            this.map = new TreeMap<>();
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            return this.map.remove(key);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<Map.Entry<K, V>> iter;
-
-            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
-                this.iter = iter;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                Map.Entry<K, V> entry = iter.next();
-                return new Entry<>(entry.getKey(), entry.getValue());
-            }
-
-            @Override
-            public void remove() {
-                iter.remove();
-            }
-
-            @Override
-            public void close() {
-            }
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index a346534..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final int capacity;
-    private final Serdes serdes;
-    private final Time time;
-
-    protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
-        this.name = name;
-        this.capacity = capacity;
-        this.serdes = serdes;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
-        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
-        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
-            @Override
-            public void apply(K key, V value) {
-                store.removed(key);
-            }
-        });
-        return store;
-    }
-
-    private static interface EldestEntryRemovalListener<K, V> {
-        public void apply(K key, V value);
-    }
-
-    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final Map<K, V> map;
-        private final NavigableSet<K> keys;
-        private EldestEntryRemovalListener<K, V> listener;
-
-        public MemoryLRUCache(String name, final int maxCacheSize) {
-            this.name = name;
-            this.keys = new TreeSet<>();
-            // leave room for one extra entry to handle adding an entry before the oldest can be removed
-            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                    if (size() > maxCacheSize) {
-                        K key = eldest.getKey();
-                        keys.remove(key);
-                        if (listener != null) listener.apply(key, eldest.getValue());
-                        return true;
-                    }
-                    return false;
-                }
-            };
-        }
-
-        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-            this.keys.add(key);
-        }
-
-        @Override
-        public void putAll(List<Entry<K, V>> entries) {
-            for (Entry<K, V> entry : entries)
-                put(entry.key(), entry.value());
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = this.map.remove(key);
-            this.keys.remove(key);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<K> keys;
-            private final Map<K, V> entries;
-            private K lastKey;
-
-            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
-                this.keys = keys;
-                this.entries = entries;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return keys.hasNext();
-            }
-
-            @Override
-            public Entry<K, V> next() {
-                lastKey = keys.next();
-                return new Entry<>(lastKey, entries.get(lastKey));
-            }
-
-            @Override
-            public void remove() {
-                keys.remove();
-                entries.remove(lastKey);
-            }
-
-            @Override
-            public void close() {
-                // do nothing
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
deleted file mode 100644
index 0fbd4ae..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import java.io.Closeable;
-import java.util.Iterator;
-
-public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable {
-
-    @Override
-    public void close();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
deleted file mode 100644
index e4faed1..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.List;
-
-/**
- * A key-value store that supports put/get/delete and range queries.
- *
- * @param <K> The key type
- * @param <V> The value type
- */
-public interface KeyValueStore<K, V> extends StateStore {
-
-    /**
-     * Get the value corresponding to this key
-     *
-     * @param key The key to fetch
-     * @return The value or null if no value is found.
-     * @throws NullPointerException If null is used for key.
-     */
-    abstract public V get(K key);
-
-    /**
-     * Update the value associated with this key
-     *
-     * @param key They key to associate the value to
-     * @param value The value
-     * @throws NullPointerException If null is used for key or value.
-     */
-    abstract public void put(K key, V value);
-
-    /**
-     * Update all the given key/value pairs
-     *
-     * @param entries A list of entries to put into the store.
-     * @throws NullPointerException If null is used for any key or value.
-     */
-    abstract public void putAll(List<Entry<K, V>> entries);
-
-    /**
-     * Delete the value from the store (if there is one)
-     *
-     * @param key The key
-     * @return The old value or null if there is no such key.
-     * @throws NullPointerException If null is used for key.
-     */
-    abstract public V delete(K key);
-
-    /**
-     * Get an iterator over a given range of keys. This iterator MUST be closed after use.
-     *
-     * @param from The first key that could be in the range
-     * @param to The last key that could be in the range
-     * @return The iterator for this range.
-     * @throws NullPointerException If null is used for from or to.
-     */
-    abstract public KeyValueIterator<K, V> range(K from, K to);
-
-    /**
-     * Return an iterator over all keys in the database. This iterator MUST be closed after use.
-     *
-     * @return An iterator of all key/value pairs in the store.
-     */
-    abstract public KeyValueIterator<K, V> all();
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
deleted file mode 100644
index c1ccbd4..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
-
-    protected final KeyValueStore<K, V> inner;
-    protected final Serdes<K, V> serialization;
-    protected final String metricGrp;
-    protected final Time time;
-
-    private final String topic;
-
-    private Sensor putTime;
-    private Sensor getTime;
-    private Sensor deleteTime;
-    private Sensor putAllTime;
-    private Sensor allTime;
-    private Sensor rangeTime;
-    private Sensor flushTime;
-    private Sensor restoreTime;
-    private StreamingMetrics metrics;
-
-    private final Set<K> dirty;
-    private final Set<K> removed;
-    private final int maxDirty;
-    private final int maxRemoved;
-
-    private int partition;
-    private ProcessorContext context;
-
-    // always wrap the logged store with the metered store
-    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
-        this.inner = inner;
-        this.serialization = serialization;
-        this.metricGrp = metricGrp;
-        this.time = time != null ? time : new SystemTime();
-        this.topic = inner.name();
-
-        this.dirty = new HashSet<K>();
-        this.removed = new HashSet<K>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
-    }
-
-    @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
-    public void init(ProcessorContext context) {
-        String name = name();
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
-        this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
-        this.deleteTime = this.metrics.addLatencySensor(metricGrp, name, "delete", "store-name", name);
-        this.putAllTime = this.metrics.addLatencySensor(metricGrp, name, "put-all", "store-name", name);
-        this.allTime = this.metrics.addLatencySensor(metricGrp, name, "all", "store-name", name);
-        this.rangeTime = this.metrics.addLatencySensor(metricGrp, name, "range", "store-name", name);
-        this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
-        this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
-
-        this.context = context;
-        this.partition = context.id().partition;
-
-        // register and possibly restore the state from the logs
-        long startNs = time.nanoseconds();
-        inner.init(context);
-        try {
-            final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
-            final Deserializer<V> valDeserializer = serialization.valueDeserializer();
-
-            context.register(this, new StateRestoreCallback() {
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    inner.put(keyDeserializer.deserialize(topic, key),
-                            valDeserializer.deserialize(topic, value));
-                }
-            });
-        } finally {
-            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public boolean persistent() {
-        return inner.persistent();
-    }
-
-    @Override
-    public V get(K key) {
-        long startNs = time.nanoseconds();
-        try {
-            return this.inner.get(key);
-        } finally {
-            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public void put(K key, V value) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.put(key, value);
-
-            this.dirty.add(key);
-            this.removed.remove(key);
-            maybeLogChange();
-        } finally {
-            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public void putAll(List<Entry<K, V>> entries) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.putAll(entries);
-
-            for (Entry<K, V> entry : entries) {
-                K key = entry.key();
-                this.dirty.add(key);
-                this.removed.remove(key);
-            }
-
-            maybeLogChange();
-        } finally {
-            this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public V delete(K key) {
-        long startNs = time.nanoseconds();
-        try {
-            V value = this.inner.delete(key);
-
-            this.dirty.remove(key);
-            this.removed.add(key);
-            maybeLogChange();
-
-            return value;
-        } finally {
-            this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
-        }
-    }
-
-    /**
-     * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
-     * store other than {@link #delete(Object)}.
-     *
-     * @param key the key for the entry that the inner store removed
-     */
-    protected void removed(K key) {
-        this.dirty.remove(key);
-        this.removed.add(key);
-        maybeLogChange();
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
-        return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
-    }
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-
-    @Override
-    public void flush() {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.flush();
-            logChange();
-        } finally {
-            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
-        }
-    }
-
-    private void maybeLogChange() {
-        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
-            logChange();
-    }
-
-    private void logChange() {
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        if (collector != null) {
-            Serializer<K> keySerializer = serialization.keySerializer();
-            Serializer<V> valueSerializer = serialization.valueSerializer();
-
-            for (K k : this.removed) {
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
-            }
-            for (K k : this.dirty) {
-                V v = this.inner.get(k);
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
-            }
-            this.removed.clear();
-            this.dirty.clear();
-        }
-    }
-
-    private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
-
-        private final KeyValueIterator<K1, V1> iter;
-        private final Sensor sensor;
-        private final long startNs;
-
-        public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
-            this.iter = iter;
-            this.sensor = sensor;
-            this.startNs = time.nanoseconds();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public Entry<K1, V1> next() {
-            return iter.next();
-        }
-
-        @Override
-        public void remove() {
-            iter.remove();
-        }
-
-        @Override
-        public void close() {
-            try {
-                iter.close();
-            } finally {
-                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
deleted file mode 100644
index e04de68..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class saves out a map of topic/partition=&gt;offsets to a file. The format of the file is UTF-8 text containing the following:
- * <pre>
- *   &lt;version&gt;
- *   &lt;n&gt;
- *   &lt;topic_name_1&gt; &lt;partition_1&gt; &lt;offset_1&gt;
- *   .
- *   .
- *   .
- *   &lt;topic_name_n&gt; &lt;partition_n&gt; &lt;offset_n&gt;
- * </pre>
- *   The first line contains a number designating the format version (currently 0), the get line contains
- *   a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
- *   separated by spaces.
- */
-public class OffsetCheckpoint {
-
-    private static final int VERSION = 0;
-
-    private final File file;
-    private final Object lock;
-
-    public OffsetCheckpoint(File file) throws IOException {
-        new File(file + ".tmp").delete(); // try to delete any existing temp files for cleanliness
-        this.file = file;
-        this.lock = new Object();
-    }
-
-    public void write(Map<TopicPartition, Long> offsets) throws IOException {
-        synchronized (lock) {
-            // write to temp file and then swap with the existing file
-            File temp = new File(file.getAbsolutePath() + ".tmp");
-
-            FileOutputStream fileOutputStream = new FileOutputStream(temp);
-            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
-            try {
-                writeIntLine(writer, VERSION);
-                writeIntLine(writer, offsets.size());
-
-                // write the entries
-                for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-                    writeEntry(writer, entry.getKey(), entry.getValue());
-
-                // flush the buffer and then fsync the underlying file
-                writer.flush();
-                fileOutputStream.getFD().sync();
-            } finally {
-                writer.close();
-            }
-
-            // swap new offset checkpoint file with previous one
-            if (!temp.renameTo(file)) {
-                // renameTo() fails on Windows if the destination file exists.
-                file.delete();
-                if (!temp.renameTo(file))
-                    throw new IOException(String.format("File rename from %s to %s failed.",
-                        temp.getAbsolutePath(),
-                        file.getAbsolutePath()));
-            }
-        }
-    }
-
-    private void writeIntLine(BufferedWriter writer, int number) throws IOException {
-        writer.write(Integer.toString(number));
-        writer.newLine();
-    }
-
-    private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
-        writer.write(part.topic());
-        writer.write(' ');
-        writer.write(Integer.toString(part.partition()));
-        writer.write(' ');
-        writer.write(Long.toString(offset));
-        writer.newLine();
-    }
-
-    public Map<TopicPartition, Long> read() throws IOException {
-        synchronized (lock) {
-            BufferedReader reader = null;
-            try {
-                reader = new BufferedReader(new FileReader(file));
-            } catch (FileNotFoundException e) {
-                return Collections.emptyMap();
-            }
-
-            try {
-                int version = readInt(reader);
-                switch (version) {
-                    case 0:
-                        int expectedSize = readInt(reader);
-                        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
-                        String line = reader.readLine();
-                        while (line != null) {
-                            String[] pieces = line.split("\\s+");
-                            if (pieces.length != 3)
-                                throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
-                                    line));
-
-                            String topic = pieces[0];
-                            int partition = Integer.parseInt(pieces[1]);
-                            long offset = Long.parseLong(pieces[2]);
-                            offsets.put(new TopicPartition(topic, partition), offset);
-                            line = reader.readLine();
-                        }
-                        if (offsets.size() != expectedSize)
-                            throw new IOException(String.format("Expected %d entries but found only %d",
-                                expectedSize,
-                                offsets.size()));
-                        return offsets;
-
-                    default:
-                        throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
-                }
-            } finally {
-                if (reader != null)
-                    reader.close();
-            }
-        }
-    }
-
-    private int readInt(BufferedReader reader) throws IOException {
-        String line = reader.readLine();
-        if (line == null)
-            throw new EOFException("File ended prematurely.");
-        int val = Integer.parseInt(line);
-        return val;
-    }
-
-    public void delete() throws IOException {
-        file.delete();
-    }
-
-    @Override
-    public String toString() {
-        return this.file.getAbsolutePath();
-    }
-
-}