You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:33 UTC

[7/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
deleted file mode 100644
index a1456f6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.Iterator;
-
-public interface Window<K, V> extends StateStore {
-
-    void init(ProcessorContext context);
-
-    Iterator<V> find(K key, long timestamp);
-
-    Iterator<V> findAfter(K key, long timestamp);
-
-    Iterator<V> findBefore(K key, long timestamp);
-
-    void put(K key, V value, long timestamp);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
deleted file mode 100644
index 46a2b9e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface WindowSupplier<K, V> {
-
-    String name();
-
-    Window<K, V> get();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
deleted file mode 100644
index 54d44f0..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import java.util.Iterator;
-
-public abstract class FilteredIterator<T, S> implements Iterator<T> {
-
-    private Iterator<S> inner;
-    private T nextValue = null;
-
-    public FilteredIterator(Iterator<S> inner) {
-        this.inner = inner;
-
-        findNext();
-    }
-
-    @Override
-    public boolean hasNext() {
-        return nextValue != null;
-    }
-
-    @Override
-    public T next() {
-        T value = nextValue;
-        findNext();
-
-        return value;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    private void findNext() {
-        while (inner.hasNext()) {
-            S item = inner.next();
-            nextValue = filter(item);
-            if (nextValue != null) {
-                return;
-            }
-        }
-        nextValue = null;
-    }
-
-    protected abstract T filter(S item);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
deleted file mode 100644
index 06083b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.kstream.Predicate;
-
-class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
-
-    private final Predicate<K, V>[] predicates;
-
-    @SuppressWarnings("unchecked")
-    public KStreamBranch(Predicate<K, V> ... predicates) {
-        this.predicates = predicates;
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamBranchProcessor();
-    }
-
-    private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
-        @Override
-        public void process(K key, V value) {
-            for (int i = 0; i < predicates.length; i++) {
-                if (predicates[i].test(key, value)) {
-                    // use forward with childIndex here and then break the loop
-                    // so that no record is going to be piped to multiple streams
-                    context().forward(key, value, i);
-                    break;
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
deleted file mode 100644
index 0b1f1e0..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
-
-    private final Predicate<K, V> predicate;
-    private final boolean filterOut;
-
-    public KStreamFilter(Predicate<K, V> predicate, boolean filterOut) {
-        this.predicate = predicate;
-        this.filterOut = filterOut;
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamFilterProcessor();
-    }
-
-    private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
-        @Override
-        public void process(K key, V value) {
-            if (filterOut ^ predicate.test(key, value)) {
-                context().forward(key, value);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
deleted file mode 100644
index 175a002..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
-
-    private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
-
-    KStreamFlatMap(KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper) {
-        this.mapper = mapper;
-    }
-
-    @Override
-    public Processor<K1, V1> get() {
-        return new KStreamFlatMapProcessor();
-    }
-
-    private class KStreamFlatMapProcessor extends AbstractProcessor<K1, V1> {
-        @Override
-        public void process(K1 key, V1 value) {
-            for (KeyValue<K2, V2> newPair : mapper.apply(key, value)) {
-                context().forward(newPair.key, newPair.value);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
deleted file mode 100644
index 9b4559b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
-
-    private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
-
-    KStreamFlatMapValues(ValueMapper<V1, ? extends Iterable<V2>> mapper) {
-        this.mapper = mapper;
-    }
-
-    @Override
-    public Processor<K1, V1> get() {
-        return new KStreamFlatMapValuesProcessor();
-    }
-
-    private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K1, V1> {
-        @Override
-        public void process(K1 key, V1 value) {
-            Iterable<V2> newValues = mapper.apply(value);
-            for (V2 v : newValues) {
-                context().forward(key, v);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
deleted file mode 100644
index 0986405..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.lang.reflect.Array;
-import java.util.Collections;
-import java.util.Set;
-
-public class KStreamImpl<K, V> implements KStream<K, V> {
-
-    private static final String FILTER_NAME = "KAFKA-FILTER-";
-
-    private static final String MAP_NAME = "KAFKA-MAP-";
-
-    private static final String MAPVALUES_NAME = "KAFKA-MAPVALUES-";
-
-    private static final String FLATMAP_NAME = "KAFKA-FLATMAP-";
-
-    private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-";
-
-    private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-";
-
-    private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-";
-
-    private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-";
-
-    private static final String BRANCH_NAME = "KAFKA-BRANCH-";
-
-    private static final String BRANCHCHILD_NAME = "KAFKA-BRANCHCHILD-";
-
-    private static final String WINDOWED_NAME = "KAFKA-WINDOWED-";
-
-    private static final String SINK_NAME = "KAFKA-SINK-";
-
-    public static final String JOINTHIS_NAME = "KAFKA-JOINTHIS-";
-
-    public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-";
-
-    public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-";
-
-    public static final String SOURCE_NAME = "KAFKA-SOURCE-";
-
-    protected final KStreamBuilder topology;
-    protected final String name;
-    protected final Set<String> sourceNodes;
-
-    public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
-        this.topology = topology;
-        this.name = name;
-        this.sourceNodes = sourceNodes;
-    }
-
-    @Override
-    public KStream<K, V> filter(Predicate<K, V> predicate) {
-        String name = topology.newName(FILTER_NAME);
-
-        topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
-
-        return new KStreamImpl<>(topology, name, sourceNodes);
-    }
-
-    @Override
-    public KStream<K, V> filterOut(final Predicate<K, V> predicate) {
-        String name = topology.newName(FILTER_NAME);
-
-        topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
-
-        return new KStreamImpl<>(topology, name, sourceNodes);
-    }
-
-    @Override
-    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
-        String name = topology.newName(MAP_NAME);
-
-        topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
-
-        return new KStreamImpl<>(topology, name, null);
-    }
-
-    @Override
-    public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
-        String name = topology.newName(MAPVALUES_NAME);
-
-        topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
-
-        return new KStreamImpl<>(topology, name, sourceNodes);
-    }
-
-    @Override
-    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
-        String name = topology.newName(FLATMAP_NAME);
-
-        topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
-
-        return new KStreamImpl<>(topology, name, null);
-    }
-
-    @Override
-    public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
-        String name = topology.newName(FLATMAPVALUES_NAME);
-
-        topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
-
-        return new KStreamImpl<>(topology, name, sourceNodes);
-    }
-
-    @Override
-    public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) {
-        String name = topology.newName(WINDOWED_NAME);
-
-        topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name);
-
-        return new KStreamWindowedImpl<>(topology, name, sourceNodes, windowSupplier);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
-        String branchName = topology.newName(BRANCH_NAME);
-
-        topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
-
-        KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
-        for (int i = 0; i < predicates.length; i++) {
-            String childName = topology.newName(BRANCHCHILD_NAME);
-
-            topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
-
-            branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes);
-        }
-
-        return branchChildren;
-    }
-
-    @Override
-    public <K1, V1> KStream<K1, V1> through(String topic,
-                                            Serializer<K> keySerializer,
-                                            Serializer<V> valSerializer,
-                                            Deserializer<K1> keyDeserializer,
-                                            Deserializer<V1> valDeserializer) {
-        String sendName = topology.newName(SINK_NAME);
-
-        topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
-
-        String sourceName = topology.newName(SOURCE_NAME);
-
-        topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
-
-        return new KStreamImpl<>(topology, sourceName, Collections.singleton(sourceName));
-    }
-
-    @Override
-    public <K1, V1> KStream<K1, V1> through(String topic) {
-        return through(topic, (Serializer<K>) null, (Serializer<V>) null, (Deserializer<K1>) null, (Deserializer<V1>) null);
-    }
-
-    @Override
-    public void to(String topic) {
-        String name = topology.newName(SINK_NAME);
-
-        topology.addSink(name, topic, this.name);
-    }
-
-    @Override
-    public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
-        String name = topology.newName(SINK_NAME);
-
-        topology.addSink(name, topic, keySerializer, valSerializer, this.name);
-    }
-
-    @Override
-    public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
-        String name = topology.newName(TRANSFORM_NAME);
-
-        topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
-        topology.connectProcessorAndStateStores(name, stateStoreNames);
-
-        return new KStreamImpl<>(topology, name, null);
-    }
-
-    @Override
-    public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
-        String name = topology.newName(TRANSFORMVALUES_NAME);
-
-        topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
-        topology.connectProcessorAndStateStores(name, stateStoreNames);
-
-        return new KStreamImpl<>(topology, name, sourceNodes);
-    }
-
-    @Override
-    public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
-        String name = topology.newName(PROCESSOR_NAME);
-
-        topology.addProcessor(name, processorSupplier, this.name);
-        topology.connectProcessorAndStateStores(name, stateStoreNames);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
deleted file mode 100644
index 5e8186e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Iterator;
-
-class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
-
-    private static abstract class Finder<K, T> {
-        abstract Iterator<T> find(K key, long timestamp);
-    }
-
-    private final String windowName;
-    private final ValueJoiner<V1, V2, V> joiner;
-
-    KStreamJoin(String windowName, ValueJoiner<V1, V2, V> joiner) {
-        this.windowName = windowName;
-        this.joiner = joiner;
-    }
-
-    @Override
-    public Processor<K, V1> get() {
-        return new KStreamJoinProcessor(windowName);
-    }
-
-    private class KStreamJoinProcessor extends AbstractProcessor<K, V1> {
-
-        private final String windowName;
-        protected Finder<K, V2> finder;
-
-        public KStreamJoinProcessor(String windowName) {
-            this.windowName = windowName;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-
-            final Window<K, V2> window = (Window<K, V2>) context.getStateStore(windowName);
-
-            this.finder = new Finder<K, V2>() {
-                @Override
-                Iterator<V2> find(K key, long timestamp) {
-                    return window.find(key, timestamp);
-                }
-            };
-        }
-
-        @Override
-        public void process(K key, V1 value) {
-            long timestamp = context().timestamp();
-            Iterator<V2> iter = finder.find(key, timestamp);
-            if (iter != null) {
-                while (iter.hasNext()) {
-                    context().forward(key, joiner.apply(value, iter.next()));
-                }
-            }
-        }
-    }
-
-    public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
-        return new ValueJoiner<T2, T1, R>() {
-            @Override
-            public R apply(T2 value2, T1 value1) {
-                return joiner.apply(value1, value2);
-            }
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
deleted file mode 100644
index 3868318..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
-
-    private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
-
-    public KStreamMap(KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
-        this.mapper = mapper;
-    }
-
-    @Override
-    public Processor<K1, V1> get() {
-        return new KStreamMapProcessor();
-    }
-
-    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
-        @Override
-        public void process(K1 key, V1 value) {
-            KeyValue<K2, V2> newPair = mapper.apply(key, value);
-            context().forward(newPair.key, newPair.value);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
deleted file mode 100644
index 692b421..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
-
-    private final ValueMapper<V1, V2> mapper;
-
-    public KStreamMapValues(ValueMapper<V1, V2> mapper) {
-        this.mapper = mapper;
-    }
-
-    @Override
-    public Processor<K1, V1> get() {
-        return new KStreamMapProcessor();
-    }
-
-    private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
-        @Override
-        public void process(K1 key, V1 value) {
-            V2 newValue = mapper.apply(value);
-            context().forward(key, newValue);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
deleted file mode 100644
index 59a815b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamPassThrough<K, V> implements ProcessorSupplier<K, V> {
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamPassThroughProcessor<K, V>();
-    }
-
-    public class KStreamPassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
-        @Override
-        public void process(K key, V value) {
-            context().forward(key, value);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
deleted file mode 100644
index 7ebab0e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
-
-    private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier;
-
-    public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) {
-        this.transformerSupplier = transformerSupplier;
-    }
-
-    @Override
-    public Processor<K1, V1> get() {
-        return new KStreamTransformProcessor(transformerSupplier.get());
-    }
-
-    public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
-
-        private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
-        private ProcessorContext context;
-
-        public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
-            this.transformer = transformer;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            transformer.init(context);
-            this.context = context;
-        }
-
-        @Override
-        public void process(K1 key, V1 value) {
-            KeyValue<K2, V2> pair = transformer.transform(key, value);
-            context.forward(pair.key, pair.value);
-        }
-
-        @Override
-        public void punctuate(long timestamp) {
-            transformer.punctuate(timestamp);
-        }
-
-        @Override
-        public void close() {
-            transformer.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
deleted file mode 100644
index 6f989e6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
-
-    private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
-
-    public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
-        this.valueTransformerSupplier = valueTransformerSupplier;
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
-    }
-
-    public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
-
-        private final ValueTransformer valueTransformer;
-        private ProcessorContext context;
-
-        public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
-            this.valueTransformer = valueTransformer;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            valueTransformer.init(context);
-            this.context = context;
-        }
-
-        @Override
-        public void process(K key, V value) {
-            context.forward(key, valueTransformer.transform(value));
-        }
-
-        @Override
-        public void punctuate(long timestamp) {
-            valueTransformer.punctuate(timestamp);
-        }
-
-        @Override
-        public void close() {
-            valueTransformer.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
deleted file mode 100644
index 2923936..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamWindow<K, V> implements ProcessorSupplier<K, V> {
-
-    private final WindowSupplier<K, V> windowSupplier;
-
-    KStreamWindow(WindowSupplier<K, V> windowSupplier) {
-        this.windowSupplier = windowSupplier;
-    }
-
-    public WindowSupplier<K, V> window() {
-        return windowSupplier;
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamWindowProcessor();
-    }
-
-    private class KStreamWindowProcessor extends AbstractProcessor<K, V> {
-
-        private Window<K, V> window;
-
-        @Override
-        public void init(ProcessorContext context) {
-            super.init(context);
-            this.window = windowSupplier.get();
-            this.window.init(context);
-        }
-
-        @Override
-        public void process(K key, V value) {
-            synchronized (this) {
-                window.put(key, value, context().timestamp());
-                context().forward(key, value);
-            }
-        }
-
-        @Override
-        public void close() {
-            window.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
deleted file mode 100644
index cb49873..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implements KStreamWindowed<K, V> {
-
-    private final WindowSupplier<K, V> windowSupplier;
-
-    public KStreamWindowedImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, WindowSupplier<K, V> windowSupplier) {
-        super(topology, name, sourceNodes);
-        this.windowSupplier = windowSupplier;
-    }
-
-    @Override
-    public <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> valueJoiner) {
-        String thisWindowName = this.windowSupplier.name();
-        String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowSupplier.name();
-        Set<String> thisSourceNodes = this.sourceNodes;
-        Set<String> otherSourceNodes = ((KStreamWindowedImpl<K, V1>) other).sourceNodes;
-
-        if (thisSourceNodes == null || otherSourceNodes == null)
-            throw new KafkaException("not joinable");
-
-        Set<String> allSourceNodes = new HashSet<>(sourceNodes);
-        allSourceNodes.addAll(((KStreamWindowedImpl<K, V1>) other).sourceNodes);
-
-        KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
-        KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner));
-        KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
-
-        String joinThisName = topology.newName(JOINTHIS_NAME);
-        String joinOtherName = topology.newName(JOINOTHER_NAME);
-        String joinMergeName = topology.newName(JOINMERGE_NAME);
-
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.copartitionSources(allSourceNodes);
-
-        return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
deleted file mode 100644
index b54bcc9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Iterator;
-
-public class WindowSupport {
-
-    public static class ValueList<V> {
-        Value<V> head = null;
-        Value<V> tail = null;
-        Value<V> dirty = null;
-
-        public void add(int slotNum, V value, long timestamp) {
-            Value<V> v = new Value<>(slotNum, value, timestamp);
-            if (tail != null) {
-                tail.next = v;
-            } else {
-                head = v;
-            }
-            tail = v;
-            if (dirty == null) dirty = v;
-        }
-
-        public Value<V> first() {
-            return head;
-        }
-
-        public void removeFirst() {
-            if (head != null) {
-                if (head == tail) tail = null;
-                head = head.next;
-            }
-        }
-
-        public boolean isEmpty() {
-            return head == null;
-        }
-
-        public boolean hasDirtyValues() {
-            return dirty != null;
-        }
-
-        public void clearDirtyValues() {
-            dirty = null;
-        }
-
-        public Iterator<Value<V>> iterator() {
-            return new ValueListIterator<V>(head);
-        }
-
-        public Iterator<Value<V>> dirtyValueIterator() {
-            return new ValueListIterator<V>(dirty);
-        }
-
-    }
-
-    private static class ValueListIterator<V> implements Iterator<Value<V>> {
-
-        Value<V> ptr;
-
-        ValueListIterator(Value<V> start) {
-            ptr = start;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return ptr != null;
-        }
-
-        @Override
-        public Value<V> next() {
-            Value<V> value = ptr;
-            if (value != null) ptr = value.next;
-            return value;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-    }
-
-    public static class Value<V> extends Stamped<V> {
-        public final int slotNum;
-        private Value<V> next = null;
-
-        Value(int slotNum, V value, long timestamp) {
-            super(value, timestamp);
-            this.slotNum = slotNum;
-        }
-    }
-
-
-    public static long getLong(byte[] bytes, int offset) {
-        long value = 0;
-        for (int i = 0; i < 8; i++) {
-            value = (value << 8) | bytes[offset + i];
-        }
-        return value;
-    }
-
-    public static int getInt(byte[] bytes, int offset) {
-        int value = 0;
-        for (int i = 0; i < 4; i++) {
-            value = (value << 8) | bytes[offset + i];
-        }
-        return value;
-    }
-
-    public static int putLong(byte[] bytes, int offset, long value) {
-        for (int i = 7; i >= 0; i--) {
-            bytes[offset + i] = (byte) (value & 0xFF);
-            value = value >> 8;
-        }
-        return 8;
-    }
-
-    public static int putInt(byte[] bytes, int offset, int value) {
-        for (int i = 3; i >= 0; i--) {
-            bytes[offset + i] = (byte) (value & 0xFF);
-            value = value >> 8;
-        }
-        return 4;
-    }
-
-    public static int puts(byte[] bytes, int offset, byte[] value) {
-        offset += putInt(bytes, offset, value.length);
-        System.arraycopy(bytes, offset, value, 0, value.length);
-        return 4 + value.length;
-    }
-
-
-    public static <T> T deserialize(byte[] bytes, int offset, int length, String topic, Deserializer<T> deserializer) {
-        byte[] buf = new byte[length];
-        System.arraycopy(bytes, offset, buf, 0, length);
-        return deserializer.deserialize(topic, buf);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
deleted file mode 100644
index 01d0024..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
- * implementations of {@link #punctuate(long)} and {@link #close()}.
- * 
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
-
-    private ProcessorContext context;
-
-    protected AbstractProcessor() {
-    }
-
-    @Override
-    public void init(ProcessorContext context) {
-        this.context = context;
-    }
-
-    /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
-     * during {@link #init(ProcessorContext) initialization}.
-     * <p>
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     * </p>
-     * 
-     * @param streamTime the stream time when this method is being called
-     */
-    @Override
-    public void punctuate(long streamTime) {
-        // do nothing
-    }
-
-    /**
-     * Close this processor and clean up any resources.
-     * <p>
-     * This method does nothing by default; if desired, subclasses should override it with custom functionality.
-     * </p>
-     */
-    @Override
-    public void close() {
-        // do nothing
-    }
-
-    /**
-     * Get the processor's context set during {@link #init(ProcessorContext) initialization}.
-     * 
-     * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
-     */
-    protected final ProcessorContext context() {
-        return this.context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
deleted file mode 100644
index 7d2188a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DefaultPartitionGrouper extends PartitionGrouper {
-
-    public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
-        Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
-
-        for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
-            Integer topicGroupId = entry.getKey();
-            Set<String> topicGroup = entry.getValue();
-
-            int maxNumPartitions = maxNumPartitions(metadata, topicGroup);
-
-            for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) {
-                Set<TopicPartition> group = new HashSet<>(topicGroup.size());
-
-                for (String topic : topicGroup) {
-                    if (partitionId < metadata.partitionsForTopic(topic).size()) {
-                        group.add(new TopicPartition(topic, partitionId));
-                    }
-                }
-                groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group));
-            }
-        }
-
-        return Collections.unmodifiableMap(groups);
-    }
-
-    protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
-        int maxNumPartitions = 0;
-        for (String topic : topics) {
-            List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
-
-            if (infos == null)
-                throw new KafkaException("topic not found :" + topic);
-
-            int numPartitions = infos.size();
-            if (numPartitions > maxNumPartitions)
-                maxNumPartitions = numPartitions;
-        }
-        return maxNumPartitions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
deleted file mode 100644
index 026ec89..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-
-import java.util.Map;
-import java.util.Set;
-
-public abstract class PartitionGrouper {
-
-    protected Map<Integer, Set<String>> topicGroups;
-
-    private KafkaStreamingPartitionAssignor partitionAssignor = null;
-
-    /**
-     * Returns a map of task ids to groups of partitions.
-     *
-     * @param metadata
-     * @return a map of task ids to groups of partitions
-     */
-    public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
-
-    public void topicGroups(Map<Integer, Set<String>> topicGroups) {
-        this.topicGroups = topicGroups;
-    }
-
-    public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
-        this.partitionAssignor = partitionAssignor;
-    }
-
-    public Set<TaskId> taskIds(TopicPartition partition) {
-        return partitionAssignor.taskIds(partition);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
deleted file mode 100644
index 3cade3a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * A processor of messages.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-public interface Processor<K, V> {
-
-    /**
-     * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
-     * that contains it is initialized.
-     * <p>
-     * If this processor is to be {@link #punctuate(long) called periodically} by the framework, then this method should
-     * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
-     * 
-     * @param context the context; may not be null
-     */
-    void init(ProcessorContext context);
-
-    /**
-     * Process the message with the given key and value.
-     * 
-     * @param key the key for the message
-     * @param value the value for the message
-     */
-    void process(K key, V value);
-
-    /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
-     * during {@link #init(ProcessorContext) initialization}.
-     * 
-     * @param timestamp the stream time when this method is being called
-     */
-    void punctuate(long timestamp);
-
-    /**
-     * Close this processor and clean up any resources.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
deleted file mode 100644
index 88ac64e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingMetrics;
-
-import java.io.File;
-
-public interface ProcessorContext {
-
-    /**
-     * Returns the task id
-     *
-     * @return the task id
-     */
-    TaskId id();
-
-    /**
-     * Returns the key serializer
-     *
-     * @return the key serializer
-     */
-    Serializer<?> keySerializer();
-
-    /**
-     * Returns the value serializer
-     *
-     * @return the value serializer
-     */
-    Serializer<?> valueSerializer();
-
-    /**
-     * Returns the key deserializer
-     *
-     * @return the key deserializer
-     */
-    Deserializer<?> keyDeserializer();
-
-    /**
-     * Returns the value deserializer
-     *
-     * @return the value deserializer
-     */
-    Deserializer<?> valueDeserializer();
-
-    /**
-     * Returns the state directory for the partition.
-     *
-     * @return the state directory
-     */
-    File stateDir();
-
-    /**
-     * Returns Metrics instance
-     *
-     * @return StreamingMetrics
-     */
-    StreamingMetrics metrics();
-
-    /**
-     * Registers and possibly restores the specified storage engine.
-     *
-     * @param store the storage engine
-     */
-    void register(StateStore store, StateRestoreCallback stateRestoreCallback);
-
-    StateStore getStateStore(String name);
-
-    void schedule(long interval);
-
-    <K, V> void forward(K key, V value);
-
-    <K, V> void forward(K key, V value, int childIndex);
-
-    void commit();
-
-    String topic();
-
-    int partition();
-
-    long offset();
-
-    long timestamp();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
deleted file mode 100644
index 719d3ac..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-public interface ProcessorSupplier<K, V> {
-
-    Processor<K, V> get();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
deleted file mode 100644
index 39decec..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * Restoration logic for log-backed state stores upon restart,
- * it takes one record at a time from the logs to apply to the restoring state.
- */
-public interface StateRestoreCallback {
-
-    void restore(byte[] key, byte[] value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
deleted file mode 100644
index 9c085a5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * A storage engine for managing state maintained by a stream processor.
- *
- * <p>
- * This interface does not specify any query capabilities, which, of course,
- * would be query engine specific. Instead it just specifies the minimum
- * functionality required to reload a storage engine from its changelog as well
- * as basic lifecycle management.
- * </p>
- */
-public interface StateStore {
-
-    /**
-     * The name of this store.
-     * @return the storage name
-     */
-    String name();
-
-    /**
-     * Initializes this state store
-     */
-    void init(ProcessorContext context);
-
-    /**
-     * Flush any cached data
-     */
-    void flush();
-
-    /**
-     * Close the storage engine
-     */
-    void close();
-
-    /**
-     * If the storage is persistent
-     */
-    boolean persistent();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
deleted file mode 100644
index 11545c5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-public interface StateStoreSupplier {
-
-    String name();
-
-    StateStore get();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
deleted file mode 100644
index 3d474fe..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-public class TaskId {
-
-    public final int topicGroupId;
-    public final int partition;
-
-    public TaskId(int topicGroupId, int partition) {
-        this.topicGroupId = topicGroupId;
-        this.partition = partition;
-    }
-
-    public String toString() {
-        return topicGroupId + "_" + partition;
-    }
-
-    public static TaskId parse(String string) {
-        int index = string.indexOf('_');
-        if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException();
-
-        try {
-            int topicGroupId = Integer.parseInt(string.substring(0, index));
-            int partition = Integer.parseInt(string.substring(index + 1));
-
-            return new TaskId(topicGroupId, partition);
-        } catch (Exception e) {
-            throw new TaskIdFormatException();
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof TaskId) {
-            TaskId other = (TaskId) o;
-            return other.topicGroupId == this.topicGroupId && other.partition == this.partition;
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        long n = ((long) topicGroupId << 32) | (long) partition;
-        return (int) (n % 0xFFFFFFFFL);
-    }
-
-    public static class TaskIdFormatException extends RuntimeException {
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
deleted file mode 100644
index 62098f2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-/**
- * An interface that allows the KStream framework to extract a timestamp from a key-value pair
- */
-public interface TimestampExtractor {
-
-    /**
-     * Extracts a timestamp from a message
-     *
-     * @param record ConsumerRecord
-     * @return timestamp
-     */
-    long extract(ConsumerRecord<Object, Object> record);
-}