You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:34 UTC

[8/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0

MINOR: remove Kafka Streams in 0.9.0

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jun Rao

Closes #474 from guozhangwang/removeKStream


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b382d6f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b382d6f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b382d6f

Branch: refs/heads/0.9.0
Commit: 2b382d6f92ed1d6f414529ce8d41089110ed560a
Parents: c258868
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Nov 9 16:33:23 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 9 16:33:23 2015 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   5 -
 build.gradle                                    |  59 +-
 checkstyle/import-control.xml                   |  14 -
 settings.gradle                                 |   2 +-
 .../apache/kafka/streams/KafkaStreaming.java    | 156 -----
 .../apache/kafka/streams/StreamingConfig.java   | 269 ---------
 .../apache/kafka/streams/StreamingMetrics.java  |  27 -
 .../kafka/streams/examples/KStreamJob.java      |  84 ---
 .../kafka/streams/examples/ProcessorJob.java    | 115 ----
 .../examples/WallclockTimestampExtractor.java   |  28 -
 .../apache/kafka/streams/kstream/KStream.java   | 180 ------
 .../kafka/streams/kstream/KStreamBuilder.java   |  74 ---
 .../kafka/streams/kstream/KStreamWindowed.java  |  38 --
 .../apache/kafka/streams/kstream/KeyValue.java  |  34 --
 .../kafka/streams/kstream/KeyValueMapper.java   |  23 -
 .../apache/kafka/streams/kstream/Predicate.java |  30 -
 .../streams/kstream/SlidingWindowSupplier.java  | 266 ---------
 .../kafka/streams/kstream/Transformer.java      |  57 --
 .../streams/kstream/TransformerSupplier.java    |  24 -
 .../kafka/streams/kstream/ValueJoiner.java      |  23 -
 .../kafka/streams/kstream/ValueMapper.java      |  23 -
 .../kafka/streams/kstream/ValueTransformer.java |  56 --
 .../kstream/ValueTransformerSupplier.java       |  24 -
 .../apache/kafka/streams/kstream/Window.java    |  36 --
 .../kafka/streams/kstream/WindowSupplier.java   |  25 -
 .../kstream/internals/FilteredIterator.java     |  63 --
 .../kstream/internals/KStreamBranch.java        |  52 --
 .../kstream/internals/KStreamFilter.java        |  48 --
 .../kstream/internals/KStreamFlatMap.java       |  47 --
 .../kstream/internals/KStreamFlatMapValues.java |  47 --
 .../streams/kstream/internals/KStreamImpl.java  | 227 -------
 .../streams/kstream/internals/KStreamJoin.java  |  93 ---
 .../streams/kstream/internals/KStreamMap.java   |  46 --
 .../kstream/internals/KStreamMapValues.java     |  45 --
 .../kstream/internals/KStreamPassThrough.java   |  37 --
 .../kstream/internals/KStreamTransform.java     |  71 ---
 .../internals/KStreamTransformValues.java       |  69 ---
 .../kstream/internals/KStreamWindow.java        |  68 ---
 .../kstream/internals/KStreamWindowedImpl.java  |  67 ---
 .../kstream/internals/WindowSupport.java        | 159 -----
 .../streams/processor/AbstractProcessor.java    |  71 ---
 .../processor/DefaultPartitionGrouper.java      |  73 ---
 .../streams/processor/PartitionGrouper.java     |  53 --
 .../kafka/streams/processor/Processor.java      |  59 --
 .../streams/processor/ProcessorContext.java     | 101 ----
 .../streams/processor/ProcessorSupplier.java    |  23 -
 .../streams/processor/StateRestoreCallback.java |  27 -
 .../kafka/streams/processor/StateStore.java     |  57 --
 .../streams/processor/StateStoreSupplier.java   |  25 -
 .../apache/kafka/streams/processor/TaskId.java  |  66 ---
 .../streams/processor/TimestampExtractor.java   |  34 --
 .../streams/processor/TopologyBuilder.java      | 498 ----------------
 .../streams/processor/TopologyException.java    |  38 --
 .../KafkaStreamingPartitionAssignor.java        | 135 -----
 .../internals/MinTimestampTracker.java          |  67 ---
 .../processor/internals/PartitionGroup.java     | 170 ------
 .../internals/ProcessorContextImpl.java         | 192 ------
 .../processor/internals/ProcessorNode.java      |  74 ---
 .../internals/ProcessorStateManager.java        | 231 --------
 .../processor/internals/ProcessorTopology.java  |  62 --
 .../processor/internals/PunctuationQueue.java   |  56 --
 .../internals/PunctuationSchedule.java          |  41 --
 .../streams/processor/internals/Punctuator.java |  24 -
 .../streams/processor/internals/QuickUnion.java |  67 ---
 .../processor/internals/RecordCollector.java    |  91 ---
 .../processor/internals/RecordQueue.java        | 140 -----
 .../streams/processor/internals/SinkNode.java   |  64 --
 .../streams/processor/internals/SourceNode.java |  64 --
 .../streams/processor/internals/Stamped.java    |  38 --
 .../processor/internals/StampedRecord.java      |  52 --
 .../streams/processor/internals/StreamTask.java | 378 ------------
 .../processor/internals/StreamThread.java       | 586 -------------------
 .../processor/internals/TimestampTracker.java   |  58 --
 .../org/apache/kafka/streams/state/Entry.java   |  42 --
 .../state/InMemoryKeyValueStoreSupplier.java    | 155 -----
 .../state/InMemoryLRUCacheStoreSupplier.java    | 195 ------
 .../kafka/streams/state/KeyValueIterator.java   |  29 -
 .../kafka/streams/state/KeyValueStore.java      |  86 ---
 .../streams/state/MeteredKeyValueStore.java     | 278 ---------
 .../kafka/streams/state/OffsetCheckpoint.java   | 172 ------
 .../state/RocksDBKeyValueStoreSupplier.java     | 298 ----------
 .../org/apache/kafka/streams/state/Serdes.java  | 161 -----
 .../org/apache/kafka/streams/state/Stores.java  | 278 ---------
 .../streams/kstream/KStreamBuilderTest.java     |  51 --
 .../kstream/internals/FilteredIteratorTest.java |  94 ---
 .../kstream/internals/KStreamBranchTest.java    |  90 ---
 .../kstream/internals/KStreamFilterTest.java    |  85 ---
 .../kstream/internals/KStreamFlatMapTest.java   |  80 ---
 .../internals/KStreamFlatMapValuesTest.java     |  77 ---
 .../kstream/internals/KStreamImplTest.java      | 137 -----
 .../kstream/internals/KStreamJoinTest.java      | 195 ------
 .../kstream/internals/KStreamMapTest.java       |  73 ---
 .../kstream/internals/KStreamMapValuesTest.java |  71 ---
 .../kstream/internals/KStreamTransformTest.java |  93 ---
 .../internals/KStreamTransformValuesTest.java   |  92 ---
 .../kstream/internals/KStreamWindowedTest.java  |  91 ---
 .../processor/DefaultPartitionGrouperTest.java  |  84 ---
 .../streams/processor/TopologyBuilderTest.java  | 254 --------
 .../internals/MinTimestampTrackerTest.java      |  93 ---
 .../processor/internals/PartitionGroupTest.java | 102 ----
 .../internals/ProcessorStateManagerTest.java    | 407 -------------
 .../internals/ProcessorTopologyTest.java        | 311 ----------
 .../internals/PunctuationQueueTest.java         |  85 ---
 .../processor/internals/QuickUnionTest.java     |  97 ---
 .../processor/internals/RecordQueueTest.java    | 116 ----
 .../processor/internals/StreamTaskTest.java     | 211 -------
 .../processor/internals/StreamThreadTest.java   | 464 ---------------
 .../state/AbstractKeyValueStoreTest.java        | 193 ------
 .../state/InMemoryKeyValueStoreTest.java        |  50 --
 .../state/InMemoryLRUCacheStoreTest.java        | 155 -----
 .../streams/state/KeyValueStoreTestDriver.java  | 464 ---------------
 .../streams/state/RocksDBKeyValueStoreTest.java |  52 --
 .../apache/kafka/streams/state/StateUtils.java  |  77 ---
 .../apache/kafka/test/KStreamTestDriver.java    |  95 ---
 .../apache/kafka/test/MockProcessorContext.java | 173 ------
 .../kafka/test/MockProcessorSupplier.java       |  59 --
 .../org/apache/kafka/test/MockSourceNode.java   |  46 --
 .../kafka/test/MockStateStoreSupplier.java      |  97 ---
 .../kafka/test/MockTimestampExtractor.java      |  30 -
 .../kafka/test/ProcessorTopologyTestDriver.java | 338 -----------
 .../apache/kafka/test/UnlimitedWindowDef.java   | 104 ----
 121 files changed, 2 insertions(+), 13604 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b18a9cf..88d575b 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -57,11 +57,6 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for file in $base_dir/stream/build/libs/kafka-streams*.jar;
-do
-  CLASSPATH=$CLASSPATH:$file
-done
-
 for file in $base_dir/tools/build/libs/kafka-tools*.jar;
 do
   CLASSPATH=$CLASSPATH:$file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4f5e482..fc7dd79 100644
--- a/build.gradle
+++ b/build.gradle
@@ -229,7 +229,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
 }
 
 def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools'] + connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
 tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { }
@@ -543,63 +543,6 @@ project(':tools') {
     test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':streams') {
-    apply plugin: 'checkstyle'
-    archivesBaseName = "kafka-streams"
-
-    dependencies {
-        compile project(':clients')
-        compile "$slf4jlog4j"
-        compile 'org.rocksdb:rocksdbjni:3.10.1'
-
-        testCompile "$junit"
-        testCompile project(path: ':clients', configuration: 'archives')
-    }
-
-    task testJar(type: Jar) {
-        classifier = 'test'
-        from sourceSets.test.output
-    }
-
-    test {
-        testLogging {
-            events "passed", "skipped", "failed"
-            exceptionFormat = 'full'
-        }
-    }
-
-    javadoc {
-        include "**/org/apache/kafka/streams/*"
-    }
-
-    tasks.create(name: "copyDependantLibs", type: Copy) {
-        from (configurations.testRuntime) {
-            include('slf4j-log4j12*')
-        }
-        from (configurations.runtime) {
-            exclude('kafka-clients*')
-        }
-        into "$buildDir/dependant-libs-${scalaVersion}"
-    }
-
-    jar {
-        dependsOn 'copyDependantLibs'
-    }
-
-    artifacts {
-      archives testJar
-    }
-
-    configurations {
-      archives.extendsFrom (testCompile)
-    }
-
-    checkstyle {
-        configFile = new File(rootDir, "checkstyle/checkstyle.xml")
-    }
-    test.dependsOn('checkstyleMain', 'checkstyleTest')
-}
-
 project(':log4j-appender') {
   apply plugin: 'checkstyle'
   archivesBaseName = "kafka-log4j-appender"

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 95ea3b7..f3a1c38 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -115,20 +115,6 @@
     <allow pkg="org.apache.log4j" />
   </subpackage>
 
-  <subpackage name="streams">
-    <allow pkg="org.apache.kafka.common"/>
-    <allow pkg="org.apache.kafka.test"/>
-    <allow pkg="org.apache.kafka.clients"/>
-    <allow pkg="org.apache.kafka.clients.producer" exact-match="true"/>
-    <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
-
-    <allow pkg="org.apache.kafka.streams"/>
-
-    <subpackage name="state">
-      <allow pkg="org.rocksdb" />
-    </subpackage>
-  </subpackage>
-
   <subpackage name="log4jappender">
     <allow pkg="org.apache.log4j" />
     <allow pkg="org.apache.kafka.clients" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 3d69fac..df602ef 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,5 +14,5 @@
 // limitations under the License.
 
 apply from: file('scala.gradle')
-include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'log4j-appender',
         'connect:api', 'connect:runtime', 'connect:json', 'connect:file'

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
deleted file mode 100644
index d274fb9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
- * sends output to zero or more output topics.
- * <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
- * <p>
- * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
- * started in the appropriate processes to balance processing load.
- * <p>
- * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
- * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
- * <p>
- * A simple example might look like this:
- * <pre>
- *    Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
- *    props.put("bootstrap.servers", "localhost:4242");
- *    props.put("key.deserializer", StringDeserializer.class);
- *    props.put("value.deserializer", StringDeserializer.class);
- *    props.put("key.serializer", StringSerializer.class);
- *    props.put("value.serializer", IntegerSerializer.class);
- *    props.put("timestamp.extractor", MyTimestampExtractor.class);
- *    StreamingConfig config = new StreamingConfig(props);
- *
- *    KStreamBuilder builder = new KStreamBuilder();
- *    builder.from("topic1").mapValue(value -&gt; value.length()).to("topic2");
- *
- *    KafkaStreaming streaming = new KafkaStreaming(builder, config);
- *    streaming.start();
- * </pre>
- *
- */
-public class KafkaStreaming {
-
-    private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
-    private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
-    private static final String JMX_PREFIX = "kafka.streaming";
-
-    private final Time time;
-
-    // container states
-    private static final int CREATED = 0;
-    private static final int RUNNING = 1;
-    private static final int STOPPED = 2;
-    private int state = CREATED;
-
-    private final StreamThread[] threads;
-
-    private String clientId;
-    private final Metrics metrics;
-
-    public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
-        // create the metrics
-        this.time = new SystemTime();
-
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
-            .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                TimeUnit.MILLISECONDS);
-        clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
-        if (clientId.length() <= 0)
-            clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
-        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
-            MetricsReporter.class);
-        reporters.add(new JmxReporter(JMX_PREFIX));
-        this.metrics = new Metrics(metricConfig, reporters, time);
-
-        this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
-        for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, this.clientId, this.metrics, this.time);
-        }
-    }
-
-    /**
-     * Start the stream process by starting all its threads
-     */
-    public synchronized void start() {
-        log.debug("Starting Kafka Stream process");
-
-        if (state == CREATED) {
-            for (StreamThread thread : threads)
-                thread.start();
-
-            state = RUNNING;
-
-            log.info("Started Kafka Stream process");
-        } else {
-            throw new IllegalStateException("This process was already started.");
-        }
-    }
-
-    /**
-     * Shutdown this stream process by signaling the threads to stop,
-     * wait for them to join and clean up the process instance.
-     */
-    public synchronized void close() {
-        log.debug("Stopping Kafka Stream process");
-
-        if (state == RUNNING) {
-            // signal the threads to stop and wait
-            for (StreamThread thread : threads)
-                thread.close();
-
-            for (StreamThread thread : threads) {
-                try {
-                    thread.join();
-                } catch (InterruptedException ex) {
-                    Thread.interrupted();
-                }
-            }
-
-            state = STOPPED;
-
-            log.info("Stopped Kafka Stream process");
-        } else {
-            throw new IllegalStateException("This process has not started yet.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
deleted file mode 100644
index 88bd844..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-
-import java.util.Map;
-
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
-public class StreamingConfig extends AbstractConfig {
-
-    private static final ConfigDef CONFIG;
-
-    /** <code>state.dir</code> */
-    public static final String STATE_DIR_CONFIG = "state.dir";
-    private static final String STATE_DIR_DOC = "Directory location for state store.";
-
-    /** <code>commit.interval.ms</code> */
-    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
-    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
-
-    /** <code>poll.ms</code> */
-    public static final String POLL_MS_CONFIG = "poll.ms";
-    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
-
-    /** <code>num.stream.threads</code> */
-    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
-    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
-
-    /** <code>buffered.records.per.partition</code> */
-    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
-    private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
-
-    /** <code>state.cleanup.delay</code> */
-    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
-    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
-
-    /** <code>total.records.to.process</code> */
-    public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
-    private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
-
-    /** <code>window.time.ms</code> */
-    public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
-    private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
-                                                     + "with this frequency even if there is no message.";
-
-    /** <code>timestamp.extractor</code> */
-    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
-    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
-
-    /** <code>partition.grouper</code> */
-    public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
-    private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
-
-    /** <code>client.id</code> */
-    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
-
-    /** <code>key.serializer</code> */
-    public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>value.serializer</code> */
-    public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
-    /** <code>key.deserializer</code> */
-    public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
-    /** <code>value.deserializer</code> */
-    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
-    /** <code>metrics.sample.window.ms</code> */
-    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
-
-    /** <code>metrics.num.samples</code> */
-    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
-
-    /** <code>metric.reporters</code> */
-    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
-
-    /**
-     * <code>bootstrap.servers</code>
-     */
-    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-
-    private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
-
-    static {
-        CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(STATE_DIR_CONFIG,
-                                        Type.STRING,
-                                        SYSTEM_TEMP_DIRECTORY,
-                                        Importance.MEDIUM,
-                                        STATE_DIR_DOC)
-                                .define(COMMIT_INTERVAL_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        Importance.HIGH,
-                                        COMMIT_INTERVAL_MS_DOC)
-                                .define(POLL_MS_CONFIG,
-                                        Type.LONG,
-                                        100,
-                                        Importance.LOW,
-                                        POLL_MS_DOC)
-                                .define(NUM_STREAM_THREADS_CONFIG,
-                                        Type.INT,
-                                        1,
-                                        Importance.LOW,
-                                        NUM_STREAM_THREADS_DOC)
-                                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
-                                        Type.INT,
-                                        1000,
-                                        Importance.LOW,
-                                        BUFFERED_RECORDS_PER_PARTITION_DOC)
-                                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
-                                        Type.LONG,
-                                        60000,
-                                        Importance.LOW,
-                                        STATE_CLEANUP_DELAY_MS_DOC)
-                                .define(TOTAL_RECORDS_TO_PROCESS,
-                                        Type.LONG,
-                                        -1L,
-                                        Importance.LOW,
-                                        TOTAL_RECORDS_TO_DOC)
-                                .define(WINDOW_TIME_MS_CONFIG,
-                                        Type.LONG,
-                                        -1L,
-                                        Importance.MEDIUM,
-                                        WINDOW_TIME_MS_DOC)
-                                .define(KEY_SERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
-                                .define(KEY_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
-                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
-                                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
-                                .define(PARTITION_GROUPER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        DefaultPartitionGrouper.class,
-                                        Importance.HIGH,
-                                        PARTITION_GROUPER_CLASS_DOC)
-                                .define(BOOTSTRAP_SERVERS_CONFIG,
-                                        Type.STRING,
-                                        Importance.HIGH,
-                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG,
-                                        Type.LIST,
-                                        "",
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                                .define(METRICS_NUM_SAMPLES_CONFIG,
-                                        Type.INT,
-                                        2,
-                                        atLeast(1),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
-    }
-
-    public static class InternalConfig {
-        public static final String PARTITION_GROUPER_INSTANCE = "__partition.grouper.instance__";
-    }
-
-    public StreamingConfig(Map<?, ?> props) {
-        super(CONFIG, props);
-    }
-
-    public Map<String, Object> getConsumerConfigs(PartitionGrouper partitionGrouper) {
-        Map<String, Object> props = getConsumerConfigs();
-        props.put(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper);
-        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
-        return props;
-    }
-
-    public Map<String, Object> getConsumerConfigs() {
-        Map<String, Object> props = this.originals();
-
-        // set consumer default property values
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        // remove properties that are not required for consumers
-        props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
-        return props;
-    }
-
-    public Map<String, Object> getProducerConfigs() {
-        Map<String, Object> props = this.originals();
-
-        // set producer default property values
-        props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
-
-        // remove properties that are not required for producers
-        props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-        props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
-        return props;
-    }
-
-    public Serializer keySerializer() {
-        return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Serializer valueSerializer() {
-        return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-    }
-
-    public Deserializer keyDeserializer() {
-        return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-    }
-
-    public Deserializer valueDeserializer() {
-        return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
-    }
-
-    public static void main(String[] args) {
-        System.out.println(CONFIG.toHtmlTable());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
deleted file mode 100644
index ebf80b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.Sensor;
-
-public interface StreamingMetrics {
-
-    Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
-
-    void recordLatency(Sensor sensor, long startNs, long endNs);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
deleted file mode 100644
index 87368c1..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-
-import java.util.Properties;
-
-public class KStreamJob {
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job");
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamingConfig config = new StreamingConfig(props);
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<String, String> stream1 = builder.from("topic1");
-
-        KStream<String, Integer> stream2 =
-            stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(String key, String value) {
-                    return new KeyValue<>(key, new Integer(value));
-                }
-            }).filter(new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
-                }
-            });
-
-        KStream<String, Integer>[] streams = stream2.branch(
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return (value % 2) == 0;
-                }
-            },
-            new Predicate<String, Integer>() {
-                @Override
-                public boolean test(String key, Integer value) {
-                    return true;
-                }
-            }
-        );
-
-        streams[0].to("topic2");
-        streams[1].to("topic3");
-
-        KafkaStreaming kstream = new KafkaStreaming(builder, config);
-        kstream.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
deleted file mode 100644
index 3274aae..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Entry;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-public class ProcessorJob {
-
-    private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
-        @Override
-        public Processor<String, String> get() {
-            return new Processor<String, String>() {
-                private ProcessorContext context;
-                private KeyValueStore<String, Integer> kvStore;
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
-                    this.context = context;
-                    this.context.schedule(1000);
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
-                }
-
-                @Override
-                public void process(String key, String value) {
-                    Integer oldValue = this.kvStore.get(key);
-                    Integer newValue = Integer.parseInt(value);
-                    if (oldValue == null) {
-                        this.kvStore.put(key, newValue);
-                    } else {
-                        this.kvStore.put(key, oldValue + newValue);
-                    }
-
-                    context.commit();
-                }
-
-                @Override
-                public void punctuate(long timestamp) {
-                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
-                    while (iter.hasNext()) {
-                        Entry<String, Integer> entry = iter.next();
-
-                        System.out.println("[" + entry.key() + ", " + entry.value() + "]");
-
-                        context.forward(entry.key(), entry.value());
-                    }
-
-                    iter.close();
-                }
-
-                @Override
-                public void close() {
-                    this.kvStore.close();
-                }
-            };
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        Properties props = new Properties();
-        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
-        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
-        StreamingConfig config = new StreamingConfig(props);
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
-
-        builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
-        builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
-        builder.connectProcessorAndStateStores("local-state", "PROCESS");
-
-        builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
-
-        KafkaStreaming streaming = new KafkaStreaming(builder, config);
-        streaming.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
deleted file mode 100644
index 26281d6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-public class WallclockTimestampExtractor implements TimestampExtractor {
-    @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
-        return System.currentTimeMillis();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
deleted file mode 100644
index 8f0794c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-/**
- * KStream is an abstraction of a stream of key-value pairs.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-public interface KStream<K, V> {
-
-    /**
-     * Creates a new stream consists of all elements of this stream which satisfy a predicate
-     *
-     * @param predicate the instance of Predicate
-     * @return the stream with only those elements that satisfy the predicate
-     */
-    KStream<K, V> filter(Predicate<K, V> predicate);
-
-    /**
-     * Creates a new stream consists all elements of this stream which do not satisfy a predicate
-     *
-     * @param predicate the instance of Predicate
-     * @return the stream with only those elements that do not satisfy the predicate
-     */
-    KStream<K, V> filterOut(Predicate<K, V> predicate);
-
-    /**
-     * Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
-     *
-     * @param mapper the instance of KeyValueMapper
-     * @param <K1>   the key type of the new stream
-     * @param <V1>   the value type of the new stream
-     * @return the mapped stream
-     */
-    <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
-
-    /**
-     * Creates a new stream by applying transforming each value in this stream into a different value in the new stream.
-     *
-     * @param mapper the instance of ValueMapper
-     * @param <V1>   the value type of the new stream
-     * @return the mapped stream
-     */
-    <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
-
-    /**
-     * Creates a new stream by applying transforming each element in this stream into zero or more elements in the new stream.
-     *
-     * @param mapper the instance of KeyValueMapper
-     * @param <K1>   the key type of the new stream
-     * @param <V1>   the value type of the new stream
-     * @return the mapped stream
-     */
-    <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
-
-    /**
-     * Creates a new stream by applying transforming each value in this stream into zero or more values in the new stream.
-     *
-     * @param processor the instance of Processor
-     * @param <V1>      the value type of the new stream
-     * @return the mapped stream
-     */
-    <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
-
-    /**
-     * Creates a new windowed stream using a specified window instance.
-     *
-     * @param windowDef the instance of Window
-     * @return the windowed stream
-     */
-    KStreamWindowed<K, V> with(WindowSupplier<K, V> windowDef);
-
-    /**
-     * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
-     * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
-     * a corresponding stream for the first predicate is evaluated true.
-     * An element will be dropped if none of the predicates evaluate true.
-     *
-     * @param predicates the ordered list of Predicate instances
-     * @return the new streams that each contain those elements for which their Predicate evaluated to true.
-     */
-    KStream<K, V>[] branch(Predicate<K, V>... predicates);
-
-    /**
-     * Sends key-value to a topic, also creates a new stream from the topic.
-     * This is equivalent to calling to(topic) and from(topic).
-     *
-     * @param topic           the topic name
-     * @param <K1>            the key type of the new stream
-     * @param <V1>            the value type of the new stream
-     * @return the new stream that consumes the given topic
-     */
-    <K1, V1> KStream<K1, V1> through(String topic);
-
-    /**
-     * Sends key-value to a topic, also creates a new stream from the topic.
-     * This is equivalent to calling to(topic) and from(topic).
-     *
-     * @param topic           the topic name
-     * @param keySerializer   key serializer used to send key-value pairs,
-     *                        if not specified the default key serializer defined in the configuration will be used
-     * @param valSerializer   value serializer used to send key-value pairs,
-     *                        if not specified the default value serializer defined in the configuration will be used
-     * @param keyDeserializer key deserializer used to create the new KStream,
-     *                        if not specified the default key deserializer defined in the configuration will be used
-     * @param valDeserializer value deserializer used to create the new KStream,
-     *                        if not specified the default value deserializer defined in the configuration will be used
-     * @param <K1>            the key type of the new stream
-     * @param <V1>            the value type of the new stream
-     * @return the new stream that consumes the given topic
-     */
-    <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valDeserializer);
-
-    /**
-     * Sends key-value to a topic using default serializers specified in the config.
-     *
-     * @param topic         the topic name
-     */
-    void to(String topic);
-
-    /**
-     * Sends key-value to a topic.
-     *
-     * @param topic         the topic name
-     * @param keySerializer key serializer used to send key-value pairs,
-     *                      if not specified the default serializer defined in the configs will be used
-     * @param valSerializer value serializer used to send key-value pairs,
-     *                      if not specified the default serializer defined in the configs will be used
-     */
-    void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
-
-    /**
-     * Applies a stateful transformation to all elements in this stream.
-     *
-     * @param transformerSupplier the class of TransformerDef
-     * @param stateStoreNames the names of the state store used by the processor
-     * @return KStream
-     */
-    <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
-
-    /**
-     * Applies a stateful transformation to all values in this stream.
-     *
-     * @param valueTransformerSupplier the class of TransformerDef
-     * @param stateStoreNames the names of the state store used by the processor
-     * @return KStream
-     */
-    <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
-
-    /**
-     * Processes all elements in this stream by applying a processor.
-     *
-     * @param processorSupplier the supplier of the Processor to use
-     * @param stateStoreNames the names of the state store used by the processor
-     * @return the new stream containing the processed output
-     */
-    void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
deleted file mode 100644
index c8a8bd3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * KStreamBuilder is the class to create KStream instances.
- */
-public class KStreamBuilder extends TopologyBuilder {
-
-    private final AtomicInteger index = new AtomicInteger(0);
-
-    public KStreamBuilder() {
-        super();
-    }
-
-    /**
-     * Creates a KStream instance for the specified topic.
-     * The default deserializers specified in the config are used.
-     *
-     * @param topics          the topic names, if empty default to all the topics in the config
-     * @return KStream
-     */
-    public <K, V> KStream<K, V> from(String... topics) {
-        String name = newName(KStreamImpl.SOURCE_NAME);
-
-        addSource(name, topics);
-
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
-    }
-
-    /**
-     * Creates a KStream instance for the specified topic.
-     *
-     * @param keyDeserializer key deserializer used to read this source KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
-     * @param valDeserializer value deserializer used to read this source KStream,
-     *                        if not specified the default deserializer defined in the configs will be used
-     * @param topics          the topic names, if empty default to all the topics in the config
-     * @return KStream
-     */
-    public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer, Deserializer<? extends V> valDeserializer, String... topics) {
-        String name = newName(KStreamImpl.SOURCE_NAME);
-
-        addSource(name, keyDeserializer, valDeserializer, topics);
-
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
-    }
-
-    public String newName(String prefix) {
-        return prefix + String.format("%010d", index.getAndIncrement());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
deleted file mode 100644
index 4d73128..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * KStreamWindowed is an abstraction of a stream of key-value pairs with a window.
- */
-public interface KStreamWindowed<K, V> extends KStream<K, V> {
-
-    /**
-     * Creates a new stream by joining this windowed stream with the other windowed stream.
-     * Each element arrived from either of the streams is joined with elements in a window of each other.
-     * The resulting values are computed by applying a joiner.
-     *
-     * @param other  the other windowed stream
-     * @param joiner ValueJoiner
-     * @param <V1>   the value type of the other stream
-     * @param <V2>   the value type of the new stream
-     * @return KStream
-     */
-    <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
deleted file mode 100644
index f633f6e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class KeyValue<K, V> {
-
-    public final K key;
-    public final V value;
-
-    public KeyValue(K key, V value) {
-        this.key = key;
-        this.value = value;
-    }
-
-    public static <K, V> KeyValue<K, V> pair(K key, V value) {
-        return new KeyValue<>(key, value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
deleted file mode 100644
index 62b07f6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueMapper<K, V, R> {
-
-    R apply(K key, V value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
deleted file mode 100644
index c73622e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * Represents a predicate (boolean-valued function) of two arguments.
- *
- * @param <K> the type of key
- * @param <V> the type of value
- */
-public interface Predicate<K, V> {
-
-    boolean test(K key, V value);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
deleted file mode 100644
index 0cf969f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.internals.FilteredIterator;
-import org.apache.kafka.streams.kstream.internals.WindowSupport;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
-    private final String name;
-    private final long duration;
-    private final int maxCount;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-
-    public SlidingWindowSupplier(
-            String name,
-            long duration,
-            int maxCount,
-            Serializer<K> keySerializer,
-            Serializer<V> valueSerializer,
-            Deserializer<K> keyDeseriaizer,
-            Deserializer<V> valueDeserializer) {
-        this.name = name;
-        this.duration = duration;
-        this.maxCount = maxCount;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
-        this.keyDeserializer = keyDeseriaizer;
-        this.valueDeserializer = valueDeserializer;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public Window<K, V> get() {
-        return new SlidingWindow();
-    }
-
-    public class SlidingWindow extends WindowSupport implements Window<K, V> {
-        private final Object lock = new Object();
-        private ProcessorContext context;
-        private int partition;
-        private int slotNum; // used as a key for Kafka log compaction
-        private LinkedList<K> list = new LinkedList<K>();
-        private HashMap<K, ValueList<V>> map = new HashMap<>();
-
-        @Override
-        public void init(ProcessorContext context) {
-            this.context = context;
-            this.partition = context.id().partition;
-            SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
-            context.register(this, restoreFunc);
-
-            for (ValueList<V> valueList : map.values()) {
-                valueList.clearDirtyValues();
-            }
-            this.slotNum = restoreFunc.slotNum;
-        }
-
-        @Override
-        public Iterator<V> findAfter(K key, final long timestamp) {
-            return find(key, timestamp, timestamp + duration);
-        }
-
-        @Override
-        public Iterator<V> findBefore(K key, final long timestamp) {
-            return find(key, timestamp - duration, timestamp);
-        }
-
-        @Override
-        public Iterator<V> find(K key, final long timestamp) {
-            return find(key, timestamp - duration, timestamp + duration);
-        }
-
-        /*
-         * finds items in the window between startTime and endTime (both inclusive)
-         */
-        private Iterator<V> find(K key, final long startTime, final long endTime) {
-            final ValueList<V> values = map.get(key);
-
-            if (values == null) {
-                return Collections.emptyIterator();
-            } else {
-                return new FilteredIterator<V, Value<V>>(values.iterator()) {
-                    @Override
-                    protected V filter(Value<V> item) {
-                        if (startTime <= item.timestamp && item.timestamp <= endTime)
-                            return item.value;
-                        else
-                            return null;
-                    }
-                };
-            }
-        }
-
-        @Override
-        public void put(K key, V value, long timestamp) {
-            synchronized (lock) {
-                slotNum++;
-
-                list.offerLast(key);
-
-                ValueList<V> values = map.get(key);
-                if (values == null) {
-                    values = new ValueList<>();
-                    map.put(key, values);
-                }
-
-                values.add(slotNum, value, timestamp);
-            }
-            evictExcess();
-            evictExpired(timestamp - duration);
-        }
-
-        private void evictExcess() {
-            while (list.size() > maxCount) {
-                K oldestKey = list.pollFirst();
-
-                ValueList<V> values = map.get(oldestKey);
-                values.removeFirst();
-
-                if (values.isEmpty()) map.remove(oldestKey);
-            }
-        }
-
-        private void evictExpired(long cutoffTime) {
-            while (true) {
-                K oldestKey = list.peekFirst();
-
-                ValueList<V> values = map.get(oldestKey);
-                Stamped<V> oldestValue = values.first();
-
-                if (oldestValue.timestamp < cutoffTime) {
-                    list.pollFirst();
-                    values.removeFirst();
-
-                    if (values.isEmpty()) map.remove(oldestKey);
-                } else {
-                    break;
-                }
-            }
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public void flush() {
-            IntegerSerializer intSerializer = new IntegerSerializer();
-            ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
-
-            RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-
-            for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
-                ValueList<V> values = entry.getValue();
-                if (values.hasDirtyValues()) {
-                    K key = entry.getKey();
-
-                    byte[] keyBytes = keySerializer.serialize(name, key);
-
-                    Iterator<Value<V>> iterator = values.dirtyValueIterator();
-                    while (iterator.hasNext()) {
-                        Value<V> dirtyValue = iterator.next();
-                        byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
-                        byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
-
-                        byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
-
-                        int offset = 0;
-                        offset += putLong(combined, offset, dirtyValue.timestamp);
-                        offset += puts(combined, offset, keyBytes);
-                        offset += puts(combined, offset, valBytes);
-
-                        if (offset != combined.length)
-                            throw new IllegalStateException("serialized length does not match");
-
-                        collector.send(new ProducerRecord<>(name, partition, slot, combined), byteArraySerializer, byteArraySerializer);
-                    }
-                    values.clearDirtyValues();
-                }
-            }
-        }
-
-        @Override
-        public void close() {
-            // TODO
-        }
-
-        @Override
-        public boolean persistent() {
-            // TODO: should not be persistent, right?
-            return false;
-        }
-
-        private class SlidingWindowRegistryCallback implements StateRestoreCallback {
-
-            final IntegerDeserializer intDeserializer;
-            int slotNum = 0;
-
-            SlidingWindowRegistryCallback() {
-                intDeserializer = new IntegerDeserializer();
-            }
-
-            @Override
-            public void restore(byte[] slot, byte[] bytes) {
-
-                slotNum = intDeserializer.deserialize("", slot);
-
-                int offset = 0;
-                // timestamp
-                long timestamp = getLong(bytes, offset);
-                offset += 8;
-                // key
-                int length = getInt(bytes, offset);
-                offset += 4;
-                K key = deserialize(bytes, offset, length, name, keyDeserializer);
-                offset += length;
-                // value
-                length = getInt(bytes, offset);
-                offset += 4;
-                V value = deserialize(bytes, offset, length, name, valueDeserializer);
-
-                put(key, value, timestamp);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
deleted file mode 100644
index b67f619..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public interface Transformer<K, V, R> {
-
-    /**
-     * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
-     * that contains it is initialized.
-     * <p>
-     * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
-     * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
-     *
-     * @param context the context; may not be null
-     */
-    void init(ProcessorContext context);
-
-    /**
-     * Transform the message with the given key and value.
-     *
-     * @param key the key for the message
-     * @param value the value for the message
-     * @return new value
-     */
-    R transform(K key, V value);
-
-    /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
-     * during {@link #init(ProcessorContext) initialization}.
-     *
-     * @param timestamp the stream time when this method is being called
-     */
-    void punctuate(long timestamp);
-
-    /**
-     * Close this processor and clean up any resources.
-     */
-    void close();
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
deleted file mode 100644
index 2c2d8dd..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface TransformerSupplier<K, V, R> {
-
-    Transformer<K, V, R> get();
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
deleted file mode 100644
index 93fc359..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface ValueJoiner<V1, V2, R> {
-
-    R apply(V1 value1, V2 value2);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
deleted file mode 100644
index a32423d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface ValueMapper<V1, V2> {
-
-    V2 apply(V1 value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
deleted file mode 100644
index 5b9e2ff..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public interface ValueTransformer<V, R> {
-
-    /**
-     * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
-     * that contains it is initialized.
-     * <p>
-     * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
-     * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
-     *
-     * @param context the context; may not be null
-     */
-    void init(ProcessorContext context);
-
-    /**
-     * Transform the message with the given key and value.
-     *
-     * @param value the value for the message
-     * @return new value
-     */
-    R transform(V value);
-
-    /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
-     * during {@link #init(ProcessorContext) initialization}.
-     *
-     * @param timestamp the stream time when this method is being called
-     */
-    void punctuate(long timestamp);
-
-    /**
-     * Close this processor and clean up any resources.
-     */
-    void close();
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
deleted file mode 100644
index 5c053c7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface ValueTransformerSupplier<V, R> {
-
-    ValueTransformer<V, R> get();
-
-}