You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:33 UTC
[7/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
deleted file mode 100644
index a1456f6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.Iterator;
-
-public interface Window<K, V> extends StateStore {
-
- void init(ProcessorContext context);
-
- Iterator<V> find(K key, long timestamp);
-
- Iterator<V> findAfter(K key, long timestamp);
-
- Iterator<V> findBefore(K key, long timestamp);
-
- void put(K key, V value, long timestamp);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
deleted file mode 100644
index 46a2b9e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface WindowSupplier<K, V> {
-
- String name();
-
- Window<K, V> get();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
deleted file mode 100644
index 54d44f0..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import java.util.Iterator;
-
-public abstract class FilteredIterator<T, S> implements Iterator<T> {
-
- private Iterator<S> inner;
- private T nextValue = null;
-
- public FilteredIterator(Iterator<S> inner) {
- this.inner = inner;
-
- findNext();
- }
-
- @Override
- public boolean hasNext() {
- return nextValue != null;
- }
-
- @Override
- public T next() {
- T value = nextValue;
- findNext();
-
- return value;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- private void findNext() {
- while (inner.hasNext()) {
- S item = inner.next();
- nextValue = filter(item);
- if (nextValue != null) {
- return;
- }
- }
- nextValue = null;
- }
-
- protected abstract T filter(S item);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
deleted file mode 100644
index 06083b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.kstream.Predicate;
-
-class KStreamBranch<K, V> implements ProcessorSupplier<K, V> {
-
- private final Predicate<K, V>[] predicates;
-
- @SuppressWarnings("unchecked")
- public KStreamBranch(Predicate<K, V> ... predicates) {
- this.predicates = predicates;
- }
-
- @Override
- public Processor<K, V> get() {
- return new KStreamBranchProcessor();
- }
-
- private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
- @Override
- public void process(K key, V value) {
- for (int i = 0; i < predicates.length; i++) {
- if (predicates[i].test(key, value)) {
- // use forward with childIndex here and then break the loop
- // so that no record is going to be piped to multiple streams
- context().forward(key, value, i);
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
deleted file mode 100644
index 0b1f1e0..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
-
- private final Predicate<K, V> predicate;
- private final boolean filterOut;
-
- public KStreamFilter(Predicate<K, V> predicate, boolean filterOut) {
- this.predicate = predicate;
- this.filterOut = filterOut;
- }
-
- @Override
- public Processor<K, V> get() {
- return new KStreamFilterProcessor();
- }
-
- private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
- @Override
- public void process(K key, V value) {
- if (filterOut ^ predicate.test(key, value)) {
- context().forward(key, value);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
deleted file mode 100644
index 175a002..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
-
- private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
-
- KStreamFlatMap(KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public Processor<K1, V1> get() {
- return new KStreamFlatMapProcessor();
- }
-
- private class KStreamFlatMapProcessor extends AbstractProcessor<K1, V1> {
- @Override
- public void process(K1 key, V1 value) {
- for (KeyValue<K2, V2> newPair : mapper.apply(key, value)) {
- context().forward(newPair.key, newPair.value);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
deleted file mode 100644
index 9b4559b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
-
- private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
-
- KStreamFlatMapValues(ValueMapper<V1, ? extends Iterable<V2>> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public Processor<K1, V1> get() {
- return new KStreamFlatMapValuesProcessor();
- }
-
- private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K1, V1> {
- @Override
- public void process(K1 key, V1 value) {
- Iterable<V2> newValues = mapper.apply(value);
- for (V2 v : newValues) {
- context().forward(key, v);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
deleted file mode 100644
index 0986405..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.lang.reflect.Array;
-import java.util.Collections;
-import java.util.Set;
-
-public class KStreamImpl<K, V> implements KStream<K, V> {
-
- private static final String FILTER_NAME = "KAFKA-FILTER-";
-
- private static final String MAP_NAME = "KAFKA-MAP-";
-
- private static final String MAPVALUES_NAME = "KAFKA-MAPVALUES-";
-
- private static final String FLATMAP_NAME = "KAFKA-FLATMAP-";
-
- private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-";
-
- private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-";
-
- private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-";
-
- private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-";
-
- private static final String BRANCH_NAME = "KAFKA-BRANCH-";
-
- private static final String BRANCHCHILD_NAME = "KAFKA-BRANCHCHILD-";
-
- private static final String WINDOWED_NAME = "KAFKA-WINDOWED-";
-
- private static final String SINK_NAME = "KAFKA-SINK-";
-
- public static final String JOINTHIS_NAME = "KAFKA-JOINTHIS-";
-
- public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-";
-
- public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-";
-
- public static final String SOURCE_NAME = "KAFKA-SOURCE-";
-
- protected final KStreamBuilder topology;
- protected final String name;
- protected final Set<String> sourceNodes;
-
- public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
- this.topology = topology;
- this.name = name;
- this.sourceNodes = sourceNodes;
- }
-
- @Override
- public KStream<K, V> filter(Predicate<K, V> predicate) {
- String name = topology.newName(FILTER_NAME);
-
- topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
-
- return new KStreamImpl<>(topology, name, sourceNodes);
- }
-
- @Override
- public KStream<K, V> filterOut(final Predicate<K, V> predicate) {
- String name = topology.newName(FILTER_NAME);
-
- topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
-
- return new KStreamImpl<>(topology, name, sourceNodes);
- }
-
- @Override
- public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
- String name = topology.newName(MAP_NAME);
-
- topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
-
- return new KStreamImpl<>(topology, name, null);
- }
-
- @Override
- public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
- String name = topology.newName(MAPVALUES_NAME);
-
- topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
-
- return new KStreamImpl<>(topology, name, sourceNodes);
- }
-
- @Override
- public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
- String name = topology.newName(FLATMAP_NAME);
-
- topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
-
- return new KStreamImpl<>(topology, name, null);
- }
-
- @Override
- public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
- String name = topology.newName(FLATMAPVALUES_NAME);
-
- topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
-
- return new KStreamImpl<>(topology, name, sourceNodes);
- }
-
- @Override
- public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) {
- String name = topology.newName(WINDOWED_NAME);
-
- topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name);
-
- return new KStreamWindowedImpl<>(topology, name, sourceNodes, windowSupplier);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
- String branchName = topology.newName(BRANCH_NAME);
-
- topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
-
- KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
- for (int i = 0; i < predicates.length; i++) {
- String childName = topology.newName(BRANCHCHILD_NAME);
-
- topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
-
- branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes);
- }
-
- return branchChildren;
- }
-
- @Override
- public <K1, V1> KStream<K1, V1> through(String topic,
- Serializer<K> keySerializer,
- Serializer<V> valSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valDeserializer) {
- String sendName = topology.newName(SINK_NAME);
-
- topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
-
- String sourceName = topology.newName(SOURCE_NAME);
-
- topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
-
- return new KStreamImpl<>(topology, sourceName, Collections.singleton(sourceName));
- }
-
- @Override
- public <K1, V1> KStream<K1, V1> through(String topic) {
- return through(topic, (Serializer<K>) null, (Serializer<V>) null, (Deserializer<K1>) null, (Deserializer<V1>) null);
- }
-
- @Override
- public void to(String topic) {
- String name = topology.newName(SINK_NAME);
-
- topology.addSink(name, topic, this.name);
- }
-
- @Override
- public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
- String name = topology.newName(SINK_NAME);
-
- topology.addSink(name, topic, keySerializer, valSerializer, this.name);
- }
-
- @Override
- public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
- String name = topology.newName(TRANSFORM_NAME);
-
- topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
- topology.connectProcessorAndStateStores(name, stateStoreNames);
-
- return new KStreamImpl<>(topology, name, null);
- }
-
- @Override
- public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
- String name = topology.newName(TRANSFORMVALUES_NAME);
-
- topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
- topology.connectProcessorAndStateStores(name, stateStoreNames);
-
- return new KStreamImpl<>(topology, name, sourceNodes);
- }
-
- @Override
- public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
- String name = topology.newName(PROCESSOR_NAME);
-
- topology.addProcessor(name, processorSupplier, this.name);
- topology.connectProcessorAndStateStores(name, stateStoreNames);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
deleted file mode 100644
index 5e8186e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Iterator;
-
-class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
-
- private static abstract class Finder<K, T> {
- abstract Iterator<T> find(K key, long timestamp);
- }
-
- private final String windowName;
- private final ValueJoiner<V1, V2, V> joiner;
-
- KStreamJoin(String windowName, ValueJoiner<V1, V2, V> joiner) {
- this.windowName = windowName;
- this.joiner = joiner;
- }
-
- @Override
- public Processor<K, V1> get() {
- return new KStreamJoinProcessor(windowName);
- }
-
- private class KStreamJoinProcessor extends AbstractProcessor<K, V1> {
-
- private final String windowName;
- protected Finder<K, V2> finder;
-
- public KStreamJoinProcessor(String windowName) {
- this.windowName = windowName;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
-
- final Window<K, V2> window = (Window<K, V2>) context.getStateStore(windowName);
-
- this.finder = new Finder<K, V2>() {
- @Override
- Iterator<V2> find(K key, long timestamp) {
- return window.find(key, timestamp);
- }
- };
- }
-
- @Override
- public void process(K key, V1 value) {
- long timestamp = context().timestamp();
- Iterator<V2> iter = finder.find(key, timestamp);
- if (iter != null) {
- while (iter.hasNext()) {
- context().forward(key, joiner.apply(value, iter.next()));
- }
- }
- }
- }
-
- public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
- return new ValueJoiner<T2, T1, R>() {
- @Override
- public R apply(T2 value2, T1 value1) {
- return joiner.apply(value1, value2);
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
deleted file mode 100644
index 3868318..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
-
- private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
-
- public KStreamMap(KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public Processor<K1, V1> get() {
- return new KStreamMapProcessor();
- }
-
- private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
- @Override
- public void process(K1 key, V1 value) {
- KeyValue<K2, V2> newPair = mapper.apply(key, value);
- context().forward(newPair.key, newPair.value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
deleted file mode 100644
index 692b421..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
-
- private final ValueMapper<V1, V2> mapper;
-
- public KStreamMapValues(ValueMapper<V1, V2> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public Processor<K1, V1> get() {
- return new KStreamMapProcessor();
- }
-
- private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
- @Override
- public void process(K1 key, V1 value) {
- V2 newValue = mapper.apply(value);
- context().forward(key, newValue);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
deleted file mode 100644
index 59a815b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamPassThrough<K, V> implements ProcessorSupplier<K, V> {
-
- @Override
- public Processor<K, V> get() {
- return new KStreamPassThroughProcessor<K, V>();
- }
-
- public class KStreamPassThroughProcessor<K, V> extends AbstractProcessor<K, V> {
- @Override
- public void process(K key, V value) {
- context().forward(key, value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
deleted file mode 100644
index 7ebab0e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
-
- private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier;
-
- public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) {
- this.transformerSupplier = transformerSupplier;
- }
-
- @Override
- public Processor<K1, V1> get() {
- return new KStreamTransformProcessor(transformerSupplier.get());
- }
-
- public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
-
- private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
- private ProcessorContext context;
-
- public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
- this.transformer = transformer;
- }
-
- @Override
- public void init(ProcessorContext context) {
- transformer.init(context);
- this.context = context;
- }
-
- @Override
- public void process(K1 key, V1 value) {
- KeyValue<K2, V2> pair = transformer.transform(key, value);
- context.forward(pair.key, pair.value);
- }
-
- @Override
- public void punctuate(long timestamp) {
- transformer.punctuate(timestamp);
- }
-
- @Override
- public void close() {
- transformer.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
deleted file mode 100644
index 6f989e6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V> {
-
- private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
-
- public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
- this.valueTransformerSupplier = valueTransformerSupplier;
- }
-
- @Override
- public Processor<K, V> get() {
- return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
- }
-
- public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
-
- private final ValueTransformer valueTransformer;
- private ProcessorContext context;
-
- public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
- this.valueTransformer = valueTransformer;
- }
-
- @Override
- public void init(ProcessorContext context) {
- valueTransformer.init(context);
- this.context = context;
- }
-
- @Override
- public void process(K key, V value) {
- context.forward(key, valueTransformer.transform(value));
- }
-
- @Override
- public void punctuate(long timestamp) {
- valueTransformer.punctuate(timestamp);
- }
-
- @Override
- public void close() {
- valueTransformer.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
deleted file mode 100644
index 2923936..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-public class KStreamWindow<K, V> implements ProcessorSupplier<K, V> {
-
- private final WindowSupplier<K, V> windowSupplier;
-
- KStreamWindow(WindowSupplier<K, V> windowSupplier) {
- this.windowSupplier = windowSupplier;
- }
-
- public WindowSupplier<K, V> window() {
- return windowSupplier;
- }
-
- @Override
- public Processor<K, V> get() {
- return new KStreamWindowProcessor();
- }
-
- private class KStreamWindowProcessor extends AbstractProcessor<K, V> {
-
- private Window<K, V> window;
-
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
- this.window = windowSupplier.get();
- this.window.init(context);
- }
-
- @Override
- public void process(K key, V value) {
- synchronized (this) {
- window.put(key, value, context().timestamp());
- context().forward(key, value);
- }
- }
-
- @Override
- public void close() {
- window.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
deleted file mode 100644
index cb49873..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KStreamWindowed;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implements KStreamWindowed<K, V> {
-
- private final WindowSupplier<K, V> windowSupplier;
-
- public KStreamWindowedImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, WindowSupplier<K, V> windowSupplier) {
- super(topology, name, sourceNodes);
- this.windowSupplier = windowSupplier;
- }
-
- @Override
- public <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> valueJoiner) {
- String thisWindowName = this.windowSupplier.name();
- String otherWindowName = ((KStreamWindowedImpl<K, V1>) other).windowSupplier.name();
- Set<String> thisSourceNodes = this.sourceNodes;
- Set<String> otherSourceNodes = ((KStreamWindowedImpl<K, V1>) other).sourceNodes;
-
- if (thisSourceNodes == null || otherSourceNodes == null)
- throw new KafkaException("not joinable");
-
- Set<String> allSourceNodes = new HashSet<>(sourceNodes);
- allSourceNodes.addAll(((KStreamWindowedImpl<K, V1>) other).sourceNodes);
-
- KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
- KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner));
- KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
-
- String joinThisName = topology.newName(JOINTHIS_NAME);
- String joinOtherName = topology.newName(JOINOTHER_NAME);
- String joinMergeName = topology.newName(JOINMERGE_NAME);
-
- topology.addProcessor(joinThisName, joinThis, this.name);
- topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);
- topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.copartitionSources(allSourceNodes);
-
- return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
deleted file mode 100644
index b54bcc9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Iterator;
-
-public class WindowSupport {
-
- public static class ValueList<V> {
- Value<V> head = null;
- Value<V> tail = null;
- Value<V> dirty = null;
-
- public void add(int slotNum, V value, long timestamp) {
- Value<V> v = new Value<>(slotNum, value, timestamp);
- if (tail != null) {
- tail.next = v;
- } else {
- head = v;
- }
- tail = v;
- if (dirty == null) dirty = v;
- }
-
- public Value<V> first() {
- return head;
- }
-
- public void removeFirst() {
- if (head != null) {
- if (head == tail) tail = null;
- head = head.next;
- }
- }
-
- public boolean isEmpty() {
- return head == null;
- }
-
- public boolean hasDirtyValues() {
- return dirty != null;
- }
-
- public void clearDirtyValues() {
- dirty = null;
- }
-
- public Iterator<Value<V>> iterator() {
- return new ValueListIterator<V>(head);
- }
-
- public Iterator<Value<V>> dirtyValueIterator() {
- return new ValueListIterator<V>(dirty);
- }
-
- }
-
- private static class ValueListIterator<V> implements Iterator<Value<V>> {
-
- Value<V> ptr;
-
- ValueListIterator(Value<V> start) {
- ptr = start;
- }
-
- @Override
- public boolean hasNext() {
- return ptr != null;
- }
-
- @Override
- public Value<V> next() {
- Value<V> value = ptr;
- if (value != null) ptr = value.next;
- return value;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- }
-
- public static class Value<V> extends Stamped<V> {
- public final int slotNum;
- private Value<V> next = null;
-
- Value(int slotNum, V value, long timestamp) {
- super(value, timestamp);
- this.slotNum = slotNum;
- }
- }
-
-
- public static long getLong(byte[] bytes, int offset) {
- long value = 0;
- for (int i = 0; i < 8; i++) {
- value = (value << 8) | bytes[offset + i];
- }
- return value;
- }
-
- public static int getInt(byte[] bytes, int offset) {
- int value = 0;
- for (int i = 0; i < 4; i++) {
- value = (value << 8) | bytes[offset + i];
- }
- return value;
- }
-
- public static int putLong(byte[] bytes, int offset, long value) {
- for (int i = 7; i >= 0; i--) {
- bytes[offset + i] = (byte) (value & 0xFF);
- value = value >> 8;
- }
- return 8;
- }
-
- public static int putInt(byte[] bytes, int offset, int value) {
- for (int i = 3; i >= 0; i--) {
- bytes[offset + i] = (byte) (value & 0xFF);
- value = value >> 8;
- }
- return 4;
- }
-
- public static int puts(byte[] bytes, int offset, byte[] value) {
- offset += putInt(bytes, offset, value.length);
- System.arraycopy(bytes, offset, value, 0, value.length);
- return 4 + value.length;
- }
-
-
- public static <T> T deserialize(byte[] bytes, int offset, int length, String topic, Deserializer<T> deserializer) {
- byte[] buf = new byte[length];
- System.arraycopy(bytes, offset, buf, 0, length);
- return deserializer.deserialize(topic, buf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
deleted file mode 100644
index 01d0024..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-/**
- * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op
- * implementations of {@link #punctuate(long)} and {@link #close()}.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
-
- private ProcessorContext context;
-
- protected AbstractProcessor() {
- }
-
- @Override
- public void init(ProcessorContext context) {
- this.context = context;
- }
-
- /**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
- * during {@link #init(ProcessorContext) initialization}.
- * <p>
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- * </p>
- *
- * @param streamTime the stream time when this method is being called
- */
- @Override
- public void punctuate(long streamTime) {
- // do nothing
- }
-
- /**
- * Close this processor and clean up any resources.
- * <p>
- * This method does nothing by default; if desired, subclasses should override it with custom functionality.
- * </p>
- */
- @Override
- public void close() {
- // do nothing
- }
-
- /**
- * Get the processor's context set during {@link #init(ProcessorContext) initialization}.
- *
- * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}.
- */
- protected final ProcessorContext context() {
- return this.context;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
deleted file mode 100644
index 7d2188a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class DefaultPartitionGrouper extends PartitionGrouper {
-
- public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
- Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
-
- for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
- Integer topicGroupId = entry.getKey();
- Set<String> topicGroup = entry.getValue();
-
- int maxNumPartitions = maxNumPartitions(metadata, topicGroup);
-
- for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) {
- Set<TopicPartition> group = new HashSet<>(topicGroup.size());
-
- for (String topic : topicGroup) {
- if (partitionId < metadata.partitionsForTopic(topic).size()) {
- group.add(new TopicPartition(topic, partitionId));
- }
- }
- groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group));
- }
- }
-
- return Collections.unmodifiableMap(groups);
- }
-
- protected int maxNumPartitions(Cluster metadata, Set<String> topics) {
- int maxNumPartitions = 0;
- for (String topic : topics) {
- List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
-
- if (infos == null)
- throw new KafkaException("topic not found :" + topic);
-
- int numPartitions = infos.size();
- if (numPartitions > maxNumPartitions)
- maxNumPartitions = numPartitions;
- }
- return maxNumPartitions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
deleted file mode 100644
index 026ec89..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-
-import java.util.Map;
-import java.util.Set;
-
-public abstract class PartitionGrouper {
-
- protected Map<Integer, Set<String>> topicGroups;
-
- private KafkaStreamingPartitionAssignor partitionAssignor = null;
-
- /**
- * Returns a map of task ids to groups of partitions.
- *
- * @param metadata
- * @return a map of task ids to groups of partitions
- */
- public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
-
- public void topicGroups(Map<Integer, Set<String>> topicGroups) {
- this.topicGroups = topicGroups;
- }
-
- public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
- this.partitionAssignor = partitionAssignor;
- }
-
- public Set<TaskId> taskIds(TopicPartition partition) {
- return partitionAssignor.taskIds(partition);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
deleted file mode 100644
index 3cade3a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * A processor of messages.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-public interface Processor<K, V> {
-
- /**
- * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology
- * that contains it is initialized.
- * <p>
- * If this processor is to be {@link #punctuate(long) called periodically} by the framework, then this method should
- * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
- *
- * @param context the context; may not be null
- */
- void init(ProcessorContext context);
-
- /**
- * Process the message with the given key and value.
- *
- * @param key the key for the message
- * @param value the value for the message
- */
- void process(K key, V value);
-
- /**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
- * during {@link #init(ProcessorContext) initialization}.
- *
- * @param timestamp the stream time when this method is being called
- */
- void punctuate(long timestamp);
-
- /**
- * Close this processor and clean up any resources.
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
deleted file mode 100644
index 88ac64e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingMetrics;
-
-import java.io.File;
-
-public interface ProcessorContext {
-
- /**
- * Returns the task id
- *
- * @return the task id
- */
- TaskId id();
-
- /**
- * Returns the key serializer
- *
- * @return the key serializer
- */
- Serializer<?> keySerializer();
-
- /**
- * Returns the value serializer
- *
- * @return the value serializer
- */
- Serializer<?> valueSerializer();
-
- /**
- * Returns the key deserializer
- *
- * @return the key deserializer
- */
- Deserializer<?> keyDeserializer();
-
- /**
- * Returns the value deserializer
- *
- * @return the value deserializer
- */
- Deserializer<?> valueDeserializer();
-
- /**
- * Returns the state directory for the partition.
- *
- * @return the state directory
- */
- File stateDir();
-
- /**
- * Returns Metrics instance
- *
- * @return StreamingMetrics
- */
- StreamingMetrics metrics();
-
- /**
- * Registers and possibly restores the specified storage engine.
- *
- * @param store the storage engine
- */
- void register(StateStore store, StateRestoreCallback stateRestoreCallback);
-
- StateStore getStateStore(String name);
-
- void schedule(long interval);
-
- <K, V> void forward(K key, V value);
-
- <K, V> void forward(K key, V value, int childIndex);
-
- void commit();
-
- String topic();
-
- int partition();
-
- long offset();
-
- long timestamp();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
deleted file mode 100644
index 719d3ac..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-public interface ProcessorSupplier<K, V> {
-
- Processor<K, V> get();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
deleted file mode 100644
index 39decec..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * Restoration logic for log-backed state stores upon restart,
- * it takes one record at a time from the logs to apply to the restoring state.
- */
-public interface StateRestoreCallback {
-
- void restore(byte[] key, byte[] value);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
deleted file mode 100644
index 9c085a5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * A storage engine for managing state maintained by a stream processor.
- *
- * <p>
- * This interface does not specify any query capabilities, which, of course,
- * would be query engine specific. Instead it just specifies the minimum
- * functionality required to reload a storage engine from its changelog as well
- * as basic lifecycle management.
- * </p>
- */
-public interface StateStore {
-
- /**
- * The name of this store.
- * @return the storage name
- */
- String name();
-
- /**
- * Initializes this state store
- */
- void init(ProcessorContext context);
-
- /**
- * Flush any cached data
- */
- void flush();
-
- /**
- * Close the storage engine
- */
- void close();
-
- /**
- * If the storage is persistent
- */
- boolean persistent();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
deleted file mode 100644
index 11545c5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-public interface StateStoreSupplier {
-
- String name();
-
- StateStore get();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
deleted file mode 100644
index 3d474fe..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-public class TaskId {
-
- public final int topicGroupId;
- public final int partition;
-
- public TaskId(int topicGroupId, int partition) {
- this.topicGroupId = topicGroupId;
- this.partition = partition;
- }
-
- public String toString() {
- return topicGroupId + "_" + partition;
- }
-
- public static TaskId parse(String string) {
- int index = string.indexOf('_');
- if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException();
-
- try {
- int topicGroupId = Integer.parseInt(string.substring(0, index));
- int partition = Integer.parseInt(string.substring(index + 1));
-
- return new TaskId(topicGroupId, partition);
- } catch (Exception e) {
- throw new TaskIdFormatException();
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof TaskId) {
- TaskId other = (TaskId) o;
- return other.topicGroupId == this.topicGroupId && other.partition == this.partition;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- long n = ((long) topicGroupId << 32) | (long) partition;
- return (int) (n % 0xFFFFFFFFL);
- }
-
- public static class TaskIdFormatException extends RuntimeException {
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
deleted file mode 100644
index 62098f2..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-/**
- * An interface that allows the KStream framework to extract a timestamp from a key-value pair
- */
-public interface TimestampExtractor {
-
- /**
- * Extracts a timestamp from a message
- *
- * @param record ConsumerRecord
- * @return timestamp
- */
- long extract(ConsumerRecord<Object, Object> record);
-}