You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/18 21:14:56 UTC
[2/2] kafka git commit: KAFKA-3104: add windowed aggregation to
KStream
KAFKA-3104: add windowed aggregation to KStream
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Mastuda
Closes #781 from guozhangwang/K3104
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a62eb599
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a62eb599
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a62eb599
Branch: refs/heads/trunk
Commit: a62eb5993f5517a64dd1020b0a9bbd1012f7ee67
Parents: cc3570d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Jan 18 12:14:43 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jan 18 12:14:43 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/kstream/HoppingWindows.java | 23 +-
.../kafka/streams/kstream/JoinWindows.java | 33 +--
.../apache/kafka/streams/kstream/KStream.java | 18 +-
.../apache/kafka/streams/kstream/KTable.java | 18 +-
.../kafka/streams/kstream/SlidingWindows.java | 67 ------
.../kafka/streams/kstream/TumblingWindows.java | 68 ++++++
.../kafka/streams/kstream/UnlimitedWindows.java | 8 +-
.../apache/kafka/streams/kstream/Window.java | 19 ++
.../apache/kafka/streams/kstream/Windowed.java | 5 +
.../apache/kafka/streams/kstream/Windows.java | 27 ++-
.../kstream/internals/KStreamAggWindow.java | 51 ++++
.../kstream/internals/KStreamAggregate.java | 171 +++++++++++++
.../kstream/internals/KStreamFlatMap.java | 14 +-
.../kstream/internals/KStreamFlatMapValues.java | 16 +-
.../streams/kstream/internals/KStreamImpl.java | 49 ++--
.../kstream/internals/KStreamJoinWindow.java | 11 +-
.../kstream/internals/KStreamKStreamJoin.java | 15 +-
.../internals/KStreamKTableLeftJoin.java | 6 +-
.../streams/kstream/internals/KStreamMap.java | 14 +-
.../kstream/internals/KStreamMapValues.java | 14 +-
.../kstream/internals/KStreamTransform.java | 8 +-
.../internals/KTableKTableAbstractJoin.java | 6 +-
.../kstream/internals/KTableKTableJoin.java | 20 +-
.../kstream/internals/KTableKTableLeftJoin.java | 18 +-
.../internals/KTableKTableOuterJoin.java | 20 +-
.../internals/KTableKTableRightJoin.java | 18 +-
.../kstream/internals/KTableMapValues.java | 38 +--
.../kstream/internals/KTableRepartitionMap.java | 38 +--
.../kstream/internals/SlidingWindow.java | 38 ---
.../kstream/internals/TumblingWindow.java | 38 +++
.../kafka/streams/state/MeteredWindowStore.java | 18 +-
.../kafka/streams/state/RocksDBStore.java | 1 +
.../kafka/streams/state/RocksDBWindowStore.java | 44 ++--
.../state/RocksDBWindowStoreSupplier.java | 10 +-
.../apache/kafka/streams/state/WindowStore.java | 6 +-
.../streams/state/WindowStoreIterator.java | 4 +-
.../kstream/internals/KStreamAggregateTest.java | 154 ++++++++++++
.../internals/KStreamKStreamJoinTest.java | 6 +-
.../kstream/internals/KTableAggregateTest.java | 18 +-
.../streams/kstream/internals/WindowsTest.java | 70 ++++++
.../streams/state/RocksDBWindowStoreTest.java | 239 +++++++++----------
41 files changed, 1000 insertions(+), 459 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
index d7141eb..f354ef9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.HoppingWindow;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
public class HoppingWindows extends Windows<HoppingWindow> {
@@ -62,9 +62,22 @@ public class HoppingWindows extends Windows<HoppingWindow> {
}
@Override
- public Collection<HoppingWindow> windowsFor(long timestamp) {
- // TODO
- return Collections.<HoppingWindow>emptyList();
+ public Map<Long, HoppingWindow> windowsFor(long timestamp) {
+ long enclosed = (size - 1) / period;
+
+ long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period);
+
+ Map<Long, HoppingWindow> windows = new HashMap<>();
+ while (windowStart <= timestamp) {
+ // add the window
+ HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size);
+ windows.put(windowStart, window);
+
+ // advance the step period
+ windowStart += this.period;
+ }
+
+ return windows;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 50aff9d..ffc1c1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -18,31 +18,27 @@
package org.apache.kafka.streams.kstream;
-import org.apache.kafka.streams.kstream.internals.SlidingWindow;
+import org.apache.kafka.streams.kstream.internals.TumblingWindow;
-import java.util.Collection;
+import java.util.Map;
/**
* This class is used to specify the behaviour of windowed joins.
*/
-public class JoinWindows extends Windows<SlidingWindow> {
-
- private static final int DEFAULT_NUM_SEGMENTS = 3;
+public class JoinWindows extends Windows<TumblingWindow> {
public final long before;
public final long after;
- public final int segments;
- private JoinWindows(String name, long before, long after, int segments) {
+ private JoinWindows(String name, long before, long after) {
super(name);
this.after = after;
this.before = before;
- this.segments = segments;
}
public static JoinWindows of(String name) {
- return new JoinWindows(name, 0L, 0L, DEFAULT_NUM_SEGMENTS);
+ return new JoinWindows(name, 0L, 0L);
}
/**
@@ -53,7 +49,7 @@ public class JoinWindows extends Windows<SlidingWindow> {
* @return
*/
public JoinWindows within(long timeDifference) {
- return new JoinWindows(this.name, timeDifference, timeDifference, this.segments);
+ return new JoinWindows(this.name, timeDifference, timeDifference);
}
/**
@@ -65,7 +61,7 @@ public class JoinWindows extends Windows<SlidingWindow> {
* @return
*/
public JoinWindows before(long timeDifference) {
- return new JoinWindows(this.name, timeDifference, this.after, this.segments);
+ return new JoinWindows(this.name, timeDifference, this.after);
}
/**
@@ -77,22 +73,11 @@ public class JoinWindows extends Windows<SlidingWindow> {
* @return
*/
public JoinWindows after(long timeDifference) {
- return new JoinWindows(this.name, this.before, timeDifference, this.segments);
- }
-
- /**
- * Specifies the number of segments to be used for rolling the window store,
- * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
- *
- * @param segments
- * @return
- */
- protected JoinWindows segments(int segments) {
- return new JoinWindows(name, before, after, segments);
+ return new JoinWindows(this.name, this.before, timeDifference);
}
@Override
- public Collection<SlidingWindow> windowsFor(long timestamp) {
+ public Map<Long, TumblingWindow> windowsFor(long timestamp) {
// this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index dace7e0..85d51e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -185,11 +185,11 @@ public interface KStream<K, V> {
* @param otherValueDeserializer value deserializer for other stream,
* if not specified the default serializer defined in the configs will be used
* @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
+ * @param <R> the value type of the new stream
*/
- <V1, V2> KStream<K, V2> join(
+ <V1, R> KStream<K, R> join(
KStream<K, V1> otherStream,
- ValueJoiner<V, V1, V2> joiner,
+ ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serializer<K> keySerializer,
Serializer<V> thisValueSerializer,
@@ -217,11 +217,11 @@ public interface KStream<K, V> {
* @param otherValueDeserializer value deserializer for other stream,
* if not specified the default serializer defined in the configs will be used
* @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
+ * @param <R> the value type of the new stream
*/
- <V1, V2> KStream<K, V2> outerJoin(
+ <V1, R> KStream<K, R> outerJoin(
KStream<K, V1> otherStream,
- ValueJoiner<V, V1, V2> joiner,
+ ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serializer<K> keySerializer,
Serializer<V> thisValueSerializer,
@@ -245,11 +245,11 @@ public interface KStream<K, V> {
* @param otherValueDeserializer value deserializer for other stream,
* if not specified the default serializer defined in the configs will be used
* @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
+ * @param <R> the value type of the new stream
*/
- <V1, V2> KStream<K, V2> leftJoin(
+ <V1, R> KStream<K, R> leftJoin(
KStream<K, V1> otherStream,
- ValueJoiner<V, V1, V2> joiner,
+ ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serializer<K> keySerializer,
Serializer<V1> otherValueSerializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 9837dae..93eceec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -111,10 +111,10 @@ public interface KTable<K, V> {
* @param other the instance of KTable joined with this stream
* @param joiner ValueJoiner
* @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
+ * @param <R> the value type of the new stream
* @return the instance of KTable
*/
- <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+ <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
/**
* Combines values of this KTable with another KTable using Outer Join.
@@ -122,10 +122,10 @@ public interface KTable<K, V> {
* @param other the instance of KTable joined with this stream
* @param joiner ValueJoiner
* @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
+ * @param <R> the value type of the new stream
* @return the instance of KTable
*/
- <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+ <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
/**
* Combines values of this KTable with another KTable using Left Join.
@@ -133,10 +133,10 @@ public interface KTable<K, V> {
* @param other the instance of KTable joined with this stream
* @param joiner ValueJoiner
* @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
+ * @param <R> the value type of the new stream
* @return the instance of KTable
*/
- <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+ <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
/**
* Aggregate values of this table by the selected key.
@@ -148,14 +148,14 @@ public interface KTable<K, V> {
* @param <V1> the value type of the aggregated table
* @return the instance of KTable
*/
- <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
+ <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier,
KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
Serializer<K1> keySerializer,
Serializer<V1> valueSerializer,
- Serializer<V2> aggValueSerializer,
+ Serializer<T> aggValueSerializer,
Deserializer<K1> keyDeserializer,
Deserializer<V1> valueDeserializer,
- Deserializer<V2> aggValueDeserializer,
+ Deserializer<T> aggValueDeserializer,
String name);
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
deleted file mode 100644
index ffdb4ad..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-
-import org.apache.kafka.streams.kstream.internals.SlidingWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-public class SlidingWindows extends Windows<SlidingWindow> {
-
- private static final long DEFAULT_SIZE_MS = 1000L;
-
- public final long size;
-
- private SlidingWindows(String name, long size) {
- super(name);
-
- this.size = size;
- }
-
- /**
- * Returns a half-interval sliding window definition with the default window size
- */
- public static SlidingWindows of(String name) {
- return new SlidingWindows(name, DEFAULT_SIZE_MS);
- }
-
- /**
- * Returns a half-interval sliding window definition with the window size in milliseconds
- */
- public SlidingWindows with(long size) {
- return new SlidingWindows(this.name, size);
- }
-
- @Override
- public Collection<SlidingWindow> windowsFor(long timestamp) {
- // TODO
- return Collections.<SlidingWindow>emptyList();
- }
-
- @Override
- public boolean equalTo(Windows other) {
- if (!other.getClass().equals(SlidingWindows.class))
- return false;
-
- SlidingWindows otherWindows = (SlidingWindows) other;
-
- return this.size == otherWindows.size;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
new file mode 100644
index 0000000..02ece3a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+import org.apache.kafka.streams.kstream.internals.TumblingWindow;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class TumblingWindows extends Windows<TumblingWindow> {
+
+ private static final long DEFAULT_SIZE_MS = 1000L;
+
+ public final long size;
+
+ private TumblingWindows(String name, long size) {
+ super(name);
+
+ this.size = size;
+ }
+
+ /**
+ * Returns a half-interval sliding window definition with the default window size
+ */
+ public static TumblingWindows of(String name) {
+ return new TumblingWindows(name, DEFAULT_SIZE_MS);
+ }
+
+ /**
+ * Returns a half-interval sliding window definition with the window size in milliseconds
+ */
+ public TumblingWindows with(long size) {
+ return new TumblingWindows(this.name, size);
+ }
+
+ @Override
+ public Map<Long, TumblingWindow> windowsFor(long timestamp) {
+ long windowStart = timestamp - timestamp % size;
+
+ return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size));
+ }
+
+ @Override
+ public boolean equalTo(Windows other) {
+ if (!other.getClass().equals(TumblingWindows.class))
+ return false;
+
+ TumblingWindows otherWindows = (TumblingWindows) other;
+
+ return this.size == otherWindows.size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 89cb0a8..6f47253 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
-import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
public class UnlimitedWindows extends Windows<UnlimitedWindow> {
@@ -46,9 +46,9 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
}
@Override
- public Collection<UnlimitedWindow> windowsFor(long timestamp) {
- // TODO
- return Collections.<UnlimitedWindow>emptyList();
+ public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
+ // always return the single unlimited window
+ return Collections.singletonMap(start, new UnlimitedWindow(start));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 63e0a35..b9401b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -48,4 +48,23 @@ public abstract class Window {
public boolean equalsTo(Window other) {
return this.start() == other.start() && this.end() == other.end();
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (!(obj instanceof Window))
+ return false;
+
+ Window other = (Window) obj;
+
+ return this.equalsTo(other) && this.start == other.start && this.end == other.end;
+ }
+
+ @Override
+ public int hashCode() {
+ long n = (this.start << 32) | this.end;
+ return (int) (n % 0xFFFFFFFFL);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 03fb656..10afc73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -35,4 +35,9 @@ public class Windowed<T> {
public Window window() {
return window;
}
+
+ @Override
+ public String toString() {
+ return "[" + value + "@" + window.start() + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index ab8d822..e4d7d9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,25 +17,31 @@
package org.apache.kafka.streams.kstream;
-import java.util.Collection;
+
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class Windows<W extends Window> {
+ private static final int DEFAULT_NUM_SEGMENTS = 3;
+
private static final long DEFAULT_EMIT_DURATION = 1000L;
private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day
private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
+ protected String name;
+
private long emitDuration;
private long maintainDuration;
- protected String name;
+ public int segments;
protected Windows(String name) {
this.name = name;
+ this.segments = DEFAULT_NUM_SEGMENTS;
this.emitDuration = DEFAULT_EMIT_DURATION;
this.maintainDuration = DEFAULT_MAINTAIN_DURATION;
}
@@ -62,6 +68,19 @@ public abstract class Windows<W extends Window> {
return this;
}
+ /**
+ * Specifies the number of segments to be used for rolling the window store,
+ * this function is not exposed to users but can be called by developers that extend this JoinWindows specs
+ *
+ * @param segments
+ * @return
+ */
+ protected Windows segments(int segments) {
+ this.segments = segments;
+
+ return this;
+ }
+
public long emitEveryMs() {
return this.emitDuration;
}
@@ -74,7 +93,7 @@ public abstract class Windows<W extends Window> {
return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
}
- abstract boolean equalTo(Windows other);
+ public abstract boolean equalTo(Windows other);
- abstract Collection<W> windowsFor(long timestamp);
+ public abstract Map<Long, W> windowsFor(long timestamp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
new file mode 100644
index 0000000..f02f53a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggWindow.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public class KStreamAggWindow<K, V> implements ProcessorSupplier<K, V> {
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamAggWindowProcessor();
+ }
+
+ private class KStreamAggWindowProcessor extends AbstractProcessor<K, V> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ }
+
+ @Override
+ public void process(K key, V value) {
+ // create a dummy window just for wrapping the timestamp
+ long timestamp = context().timestamp();
+
+ // send the new aggregate value
+ context().forward(new Windowed<>(key, new UnlimitedWindow(timestamp)), new Change<>(value, null));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
new file mode 100644
index 0000000..5745a03
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamAggregate<K, V, T, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, T> {
+
+ private final String storeName;
+ private final Windows<W> windows;
+ private final Aggregator<K, V, T> aggregator;
+
+ private boolean sendOldValues = false;
+
+ public KStreamAggregate(Windows<W> windows, String storeName, Aggregator<K, V, T> aggregator) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public Processor<Windowed<K>, Change<V>> get() {
+ return new KStreamAggregateProcessor();
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+
+ private WindowStore<K, T> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(Windowed<K> windowedKey, Change<V> change) {
+ // first get the matching windows
+ long timestamp = windowedKey.window().start();
+ K key = windowedKey.value();
+ V value = change.newValue;
+
+ Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+ long timeFrom = Long.MAX_VALUE;
+ long timeTo = Long.MIN_VALUE;
+
+ // use range query on window store for efficient reads
+ for (long windowStartMs : matchedWindows.keySet()) {
+ timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+ timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+ }
+
+ WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue<Long, T> entry = iter.next();
+ W window = matchedWindows.get(entry.key);
+
+ if (window != null) {
+
+ T oldAgg = entry.value;
+
+ if (oldAgg == null)
+ oldAgg = aggregator.initialValue();
+
+ // try to add the new new value (there will never be old value)
+ T newAgg = aggregator.add(key, value, oldAgg);
+
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
+
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+ matchedWindows.remove(entry.key);
+ }
+ }
+
+ iter.close();
+
+ // create the new window for the rest of unmatched window that do not exist yet
+ for (long windowStartMs : matchedWindows.keySet()) {
+ T oldAgg = aggregator.initialValue();
+ T newAgg = aggregator.add(key, value, oldAgg);
+
+ windowStore.put(key, newAgg, windowStartMs);
+
+ // send the new aggregate pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(newAgg, null));
+ }
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<Windowed<K>, T> view() {
+
+ return new KTableValueGetterSupplier<Windowed<K>, T>() {
+
+ public KTableValueGetter<Windowed<K>, T> get() {
+ return new KStreamAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, T> {
+
+ private WindowStore<K, T> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T get(Windowed<K> windowedKey) {
+ K key = windowedKey.value();
+ W window = (W) windowedKey.window();
+
+ // this iterator should only contain one element
+ Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
+
+ return iter.next().value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index 175a002..daef8b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-class KStreamFlatMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
- private final KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper;
+ private final KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper;
- KStreamFlatMap(KeyValueMapper<K1, V1, Iterable<KeyValue<K2, V2>>> mapper) {
+ KStreamFlatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K1, V1> get() {
+ public Processor<K, V> get() {
return new KStreamFlatMapProcessor();
}
- private class KStreamFlatMapProcessor extends AbstractProcessor<K1, V1> {
+ private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
@Override
- public void process(K1 key, V1 value) {
- for (KeyValue<K2, V2> newPair : mapper.apply(key, value)) {
+ public void process(K key, V value) {
+ for (KeyValue<K1, V1> newPair : mapper.apply(key, value)) {
context().forward(newPair.key, newPair.value);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
index 9b4559b..97d6b7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java
@@ -22,24 +22,24 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-class KStreamFlatMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamFlatMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
- private final ValueMapper<V1, ? extends Iterable<V2>> mapper;
+ private final ValueMapper<V, ? extends Iterable<V1>> mapper;
- KStreamFlatMapValues(ValueMapper<V1, ? extends Iterable<V2>> mapper) {
+ KStreamFlatMapValues(ValueMapper<V, ? extends Iterable<V1>> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K1, V1> get() {
+ public Processor<K, V> get() {
return new KStreamFlatMapValuesProcessor();
}
- private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K1, V1> {
+ private class KStreamFlatMapValuesProcessor extends AbstractProcessor<K, V> {
@Override
- public void process(K1 key, V1 value) {
- Iterable<V2> newValues = mapper.apply(value);
- for (V2 v : newValues) {
+ public void process(K key, V value) {
+ Iterable<V1> newValues = mapper.apply(value);
+ for (V1 v : newValues) {
context().forward(key, v);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2459f0d..7b634dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -305,28 +305,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
RocksDBWindowStoreSupplier<K, V> thisWindow =
new RocksDBWindowStoreSupplier<>(
windows.name() + "-this",
- windows.before,
- windows.after,
windows.maintainMs(),
windows.segments,
+ true,
new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
null);
RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>(
windows.name() + "-other",
- windows.before,
- windows.after,
windows.maintainMs(),
windows.segments,
+ true,
new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
- KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name());
- KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
+ KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+ KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+
+ KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, outer);
+ KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, reverseJoiner(joiner), outer);
- KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, outer);
- KStreamKStreamJoin<K, R, V1, V> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), reverseJoiner(joiner), outer);
KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
String thisWindowStreamName = topology.newName(WINDOWED_NAME);
@@ -362,15 +361,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>(
windows.name() + "-this",
- windows.before,
- windows.after,
windows.maintainMs(),
windows.segments,
+ true,
new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
- KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name());
- KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), joiner, true);
+ KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
+ KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
String joinThisName = topology.newName(LEFTJOIN_NAME);
@@ -401,8 +399,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serializer<T> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<T> aggValueDeserializer) {
- // TODO
- return null;
+
+ // TODO: this agg window operator is only used for casting K to Windowed<K> for
+ // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
+ String aggregateName = topology.newName(AGGREGATE_NAME);
+ String aggWindowName = topology.newName(WINDOWED_NAME);
+
+ ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
+ ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
+
+ RocksDBWindowStoreSupplier<K, T> aggregateStore =
+ new RocksDBWindowStoreSupplier<>(
+ windows.name(),
+ windows.maintainMs(),
+ windows.segments,
+ false,
+ new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
+ null);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(aggWindowName, aggWindowSupplier, this.name);
+ topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName);
+ topology.addStateStore(aggregateStore, aggregateName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index b122aa1..4f427d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -26,9 +27,14 @@ import org.apache.kafka.streams.state.WindowStore;
class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
private final String windowName;
+ private final long windowSizeMs;
+ private final long retentionPeriodMs;
- KStreamJoinWindow(String windowName) {
+
+ KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) {
this.windowName = windowName;
+ this.windowSizeMs = windowSizeMs;
+ this.retentionPeriodMs = retentionPeriodMs;
}
@Override
@@ -46,6 +52,9 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V> {
super.init(context);
window = (WindowStore<K, V>) context.getStateStore(windowName);
+
+ if (windowSizeMs * 2 > retentionPeriodMs)
+ throw new KafkaException("The retention period must be at least two times the join window size.");
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 8a9bf6c..01e3325 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -29,11 +30,16 @@ import java.util.Iterator;
class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private final String otherWindowName;
+ private final long joinBeforeMs;
+ private final long joinAfterMs;
+
private final ValueJoiner<V1, V2, R> joiner;
private final boolean outer;
- KStreamKStreamJoin(String otherWindowName, ValueJoiner<V1, V2, R> joiner, boolean outer) {
+ KStreamKStreamJoin(String otherWindowName, long joinBeforeMs, long joinAfterMs, ValueJoiner<V1, V2, R> joiner, boolean outer) {
this.otherWindowName = otherWindowName;
+ this.joinBeforeMs = joinBeforeMs;
+ this.joinAfterMs = joinAfterMs;
this.joiner = joiner;
this.outer = outer;
}
@@ -59,10 +65,13 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
public void process(K key, V1 value) {
boolean needOuterJoin = KStreamKStreamJoin.this.outer;
- Iterator<V2> iter = otherWindow.fetch(key, context().timestamp());
+ long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
+ long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
+
+ Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo);
while (iter.hasNext()) {
needOuterJoin = false;
- context().forward(key, joiner.apply(value, iter.next()));
+ context().forward(key, joiner.apply(value, iter.next().value));
}
if (needOuterJoin)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index 51a6277..dfca019 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -23,12 +23,12 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-class KStreamKTableLeftJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
+class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
- private final ValueJoiner<V1, V2, V> joiner;
+ private final ValueJoiner<V1, V2, R> joiner;
- KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, V> joiner) {
+ KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) {
this.valueGetterSupplier = table.valueGetterSupplier();
this.joiner = joiner;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 3868318..57f1431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -23,23 +23,23 @@ import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-class KStreamMap<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
- private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+ private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
- public KStreamMap(KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+ public KStreamMap(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K1, V1> get() {
+ public Processor<K, V> get() {
return new KStreamMapProcessor();
}
- private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+ private class KStreamMapProcessor extends AbstractProcessor<K, V> {
@Override
- public void process(K1 key, V1 value) {
- KeyValue<K2, V2> newPair = mapper.apply(key, value);
+ public void process(K key, V value) {
+ KeyValue<K1, V1> newPair = mapper.apply(key, value);
context().forward(newPair.key, newPair.value);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
index 692b421..06667e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java
@@ -22,23 +22,23 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-class KStreamMapValues<K1, V1, V2> implements ProcessorSupplier<K1, V1> {
+class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {
- private final ValueMapper<V1, V2> mapper;
+ private final ValueMapper<V, V1> mapper;
- public KStreamMapValues(ValueMapper<V1, V2> mapper) {
+ public KStreamMapValues(ValueMapper<V, V1> mapper) {
this.mapper = mapper;
}
@Override
- public Processor<K1, V1> get() {
+ public Processor<K, V> get() {
return new KStreamMapProcessor();
}
- private class KStreamMapProcessor extends AbstractProcessor<K1, V1> {
+ private class KStreamMapProcessor extends AbstractProcessor<K, V> {
@Override
- public void process(K1 key, V1 value) {
- V2 newValue = mapper.apply(value);
+ public void process(K key, V value) {
+ V1 newValue = mapper.apply(value);
context().forward(key, newValue);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 7ebab0e..a9d8f97 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -24,16 +24,16 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-public class KStreamTransform<K1, V1, K2, V2> implements ProcessorSupplier<K1, V1> {
+public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
- private final TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier;
+ private final TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier;
- public KStreamTransform(TransformerSupplier<K1, V1, KeyValue<K2, V2>> transformerSupplier) {
+ public KStreamTransform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier) {
this.transformerSupplier = transformerSupplier;
}
@Override
- public Processor<K1, V1> get() {
+ public Processor<K, V> get() {
return new KStreamTransformProcessor(transformerSupplier.get());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index ad987dd..5e441aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -19,19 +19,19 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueJoiner;
-abstract class KTableKTableAbstractJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessorSupplier<K, V1, R> {
protected final KTableImpl<K, ?, V1> table1;
protected final KTableImpl<K, ?, V2> table2;
protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
- protected final ValueJoiner<V1, V2, V> joiner;
+ protected final ValueJoiner<V1, V2, R> joiner;
protected boolean sendOldValues = false;
KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1,
KTableImpl<K, ?, V2> table2,
- ValueJoiner<V1, V2, V> joiner) {
+ ValueJoiner<V1, V2, R> joiner) {
this.table1 = table1;
this.table2 = table2;
this.valueGetterSupplier1 = table1.valueGetterSupplier();
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 9716edd..6eb27b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
- KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+ KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
super(table1, table2, joiner);
}
@@ -34,10 +34,10 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
}
@Override
- public KTableValueGetterSupplier<K, V> view() {
- return new KTableValueGetterSupplier<K, V>() {
+ public KTableValueGetterSupplier<K, R> view() {
+ return new KTableValueGetterSupplier<K, R>() {
- public KTableValueGetter<K, V> get() {
+ public KTableValueGetter<K, R> get() {
return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
@@ -61,8 +61,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
@Override
public void process(K key, Change<V1> change) {
- V newValue = null;
- V oldValue = null;
+ R newValue = null;
+ R oldValue = null;
V2 value2 = null;
if (change.newValue != null || change.oldValue != null)
@@ -78,7 +78,7 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
}
}
- private class KTableKTableJoinValueGetter implements KTableValueGetter<K, V> {
+ private class KTableKTableJoinValueGetter implements KTableValueGetter<K, R> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@@ -95,8 +95,8 @@ class KTableKTableJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1,
}
@Override
- public V get(K key) {
- V newValue = null;
+ public R get(K key) {
+ R newValue = null;
V1 value1 = valueGetter1.get(key);
if (value1 != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index b10bdb5..00e872e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
- KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+ KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
super(table1, table2, joiner);
}
@@ -34,10 +34,10 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
@Override
- public KTableValueGetterSupplier<K, V> view() {
- return new KTableValueGetterSupplier<K, V>() {
+ public KTableValueGetterSupplier<K, R> view() {
+ return new KTableValueGetterSupplier<K, R>() {
- public KTableValueGetter<K, V> get() {
+ public KTableValueGetter<K, R> get() {
return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
@@ -61,8 +61,8 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
@Override
public void process(K key, Change<V1> change) {
- V newValue = null;
- V oldValue = null;
+ R newValue = null;
+ R oldValue = null;
V2 value2 = null;
if (change.newValue != null || change.oldValue != null)
@@ -79,7 +79,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
- private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, V> {
+ private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@@ -96,7 +96,7 @@ class KTableKTableLeftJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
@Override
- public V get(K key) {
+ public R get(K key) {
V1 value1 = valueGetter1.get(key);
if (value1 != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index b859b34..6ab0ae9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
- KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+ KTableKTableOuterJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
super(table1, table2, joiner);
}
@@ -34,10 +34,10 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
@Override
- public KTableValueGetterSupplier<K, V> view() {
- return new KTableValueGetterSupplier<K, V>() {
+ public KTableValueGetterSupplier<K, R> view() {
+ return new KTableValueGetterSupplier<K, R>() {
- public KTableValueGetter<K, V> get() {
+ public KTableValueGetter<K, R> get() {
return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
@@ -61,8 +61,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
@Override
public void process(K key, Change<V1> change) {
- V newValue = null;
- V oldValue = null;
+ R newValue = null;
+ R oldValue = null;
V2 value2 = valueGetter.get(key);
if (change.newValue != null || value2 != null)
@@ -77,7 +77,7 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
}
- private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, V> {
+ private class KTableKTableOuterJoinValueGetter implements KTableValueGetter<K, R> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@@ -94,8 +94,8 @@ class KTableKTableOuterJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
@Override
- public V get(K key) {
- V newValue = null;
+ public R get(K key) {
+ R newValue = null;
V1 value1 = valueGetter1.get(key);
V2 value2 = valueGetter2.get(key);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index f20e987..a6a13fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -22,10 +22,10 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V, V1, V2> {
+class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
- KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, V> joiner) {
+ KTableKTableRightJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<V1, V2, R> joiner) {
super(table1, table2, joiner);
}
@@ -35,10 +35,10 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
@Override
- public KTableValueGetterSupplier<K, V> view() {
- return new KTableValueGetterSupplier<K, V>() {
+ public KTableValueGetterSupplier<K, R> view() {
+ return new KTableValueGetterSupplier<K, R>() {
- public KTableValueGetter<K, V> get() {
+ public KTableValueGetter<K, R> get() {
return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
@@ -62,8 +62,8 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
@Override
public void process(K key, Change<V1> change) {
- V newValue = null;
- V oldValue = null;
+ R newValue = null;
+ R oldValue = null;
V2 value2 = valueGetter.get(key);
if (value2 != null) {
@@ -77,7 +77,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
- private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, V> {
+ private class KTableKTableRightJoinValueGetter implements KTableValueGetter<K, R> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
@@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, V, V1, V2> extends KTableKTableAbstractJoin<K, V,
}
@Override
- public V get(K key) {
+ public R get(K key) {
V2 value2 = valueGetter2.get(key);
if (value2 != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c664906..244d8ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -23,30 +23,30 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
+class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
- private final KTableImpl<K1, ?, V1> parent;
- private final ValueMapper<V1, V2> mapper;
+ private final KTableImpl<K, ?, V> parent;
+ private final ValueMapper<V, V1> mapper;
private boolean sendOldValues = false;
- public KTableMapValues(KTableImpl<K1, ?, V1> parent, ValueMapper<V1, V2> mapper) {
+ public KTableMapValues(KTableImpl<K, ?, V> parent, ValueMapper<V, V1> mapper) {
this.parent = parent;
this.mapper = mapper;
}
@Override
- public Processor<K1, Change<V1>> get() {
+ public Processor<K, Change<V>> get() {
return new KTableMapValuesProcessor();
}
@Override
- public KTableValueGetterSupplier<K1, V2> view() {
- final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+ public KTableValueGetterSupplier<K, V1> view() {
+ final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
- return new KTableValueGetterSupplier<K1, V2>() {
+ return new KTableValueGetterSupplier<K, V1>() {
- public KTableValueGetter<K1, V2> get() {
+ public KTableValueGetter<K, V1> get() {
return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
}
@@ -59,8 +59,8 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
sendOldValues = true;
}
- private V2 computeValue(V1 value) {
- V2 newValue = null;
+ private V1 computeValue(V value) {
+ V1 newValue = null;
if (value != null)
newValue = mapper.apply(value);
@@ -68,22 +68,22 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
return newValue;
}
- private class KTableMapValuesProcessor extends AbstractProcessor<K1, Change<V1>> {
+ private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> {
@Override
- public void process(K1 key, Change<V1> change) {
- V2 newValue = computeValue(change.newValue);
- V2 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
+ public void process(K key, Change<V> change) {
+ V1 newValue = computeValue(change.newValue);
+ V1 oldValue = sendOldValues ? computeValue(change.oldValue) : null;
context().forward(key, new Change<>(newValue, oldValue));
}
}
- private class KTableMapValuesValueGetter implements KTableValueGetter<K1, V2> {
+ private class KTableMapValuesValueGetter implements KTableValueGetter<K, V1> {
- private final KTableValueGetter<K1, V1> parentGetter;
+ private final KTableValueGetter<K, V> parentGetter;
- public KTableMapValuesValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+ public KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
}
@@ -93,7 +93,7 @@ class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2>
}
@Override
- public V2 get(K1 key) {
+ public V1 get(K key) {
return computeValue(parentGetter.get(key));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index bbef7fb..12fcc17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -29,28 +29,28 @@ import org.apache.kafka.streams.processor.ProcessorContext;
*
* Given the input, it can output at most two records (one mapped from old value and one mapped from new value).
*/
-public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupplier<K1, V1, KeyValue<K2, V2>> {
+public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSupplier<K, V, KeyValue<K1, V1>> {
- private final KTableImpl<K1, ?, V1> parent;
- private final KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper;
+ private final KTableImpl<K, ?, V> parent;
+ private final KeyValueMapper<K, V, KeyValue<K1, V1>> mapper;
- public KTableRepartitionMap(KTableImpl<K1, ?, V1> parent, KeyValueMapper<K1, V1, KeyValue<K2, V2>> mapper) {
+ public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
this.parent = parent;
this.mapper = mapper;
}
@Override
- public Processor<K1, Change<V1>> get() {
+ public Processor<K, Change<V>> get() {
return new KTableMapProcessor();
}
@Override
- public KTableValueGetterSupplier<K1, KeyValue<K2, V2>> view() {
- final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+ public KTableValueGetterSupplier<K, KeyValue<K1, V1>> view() {
+ final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
- return new KTableValueGetterSupplier<K1, KeyValue<K2, V2>>() {
+ return new KTableValueGetterSupplier<K, KeyValue<K1, V1>>() {
- public KTableValueGetter<K1, KeyValue<K2, V2>> get() {
+ public KTableValueGetter<K, KeyValue<K1, V1>> get() {
return new KTableMapValueGetter(parentValueGetterSupplier.get());
}
@@ -63,8 +63,8 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
throw new KafkaException("KTableRepartitionMap should always require sending old values.");
}
- private KeyValue<K2, V2> computeValue(K1 key, V1 value) {
- KeyValue<K2, V2> newValue = null;
+ private KeyValue<K1, V1> computeValue(K key, V value) {
+ KeyValue<K1, V1> newValue = null;
if (key != null || value != null)
newValue = mapper.apply(key, value);
@@ -72,26 +72,26 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
return newValue;
}
- private class KTableMapProcessor extends AbstractProcessor<K1, Change<V1>> {
+ private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> {
@Override
- public void process(K1 key, Change<V1> change) {
- KeyValue<K2, V2> newPair = computeValue(key, change.newValue);
+ public void process(K key, Change<V> change) {
+ KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
context().forward(newPair.key, new Change<>(newPair.value, null));
if (change.oldValue != null) {
- KeyValue<K2, V2> oldPair = computeValue(key, change.oldValue);
+ KeyValue<K1, V1> oldPair = computeValue(key, change.oldValue);
context().forward(oldPair.key, new Change<>(null, oldPair.value));
}
}
}
- private class KTableMapValueGetter implements KTableValueGetter<K1, KeyValue<K2, V2>> {
+ private class KTableMapValueGetter implements KTableValueGetter<K, KeyValue<K1, V1>> {
- private final KTableValueGetter<K1, V1> parentGetter;
+ private final KTableValueGetter<K, V> parentGetter;
- public KTableMapValueGetter(KTableValueGetter<K1, V1> parentGetter) {
+ public KTableMapValueGetter(KTableValueGetter<K, V> parentGetter) {
this.parentGetter = parentGetter;
}
@@ -101,7 +101,7 @@ public class KTableRepartitionMap<K1, V1, K2, V2> implements KTableProcessorSupp
}
@Override
- public KeyValue<K2, V2> get(K1 key) {
+ public KeyValue<K1, V1> get(K key) {
return computeValue(key, parentGetter.get(key));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
deleted file mode 100644
index a6b5149..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindow.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.streams.kstream.Window;
-
-public class SlidingWindow extends Window {
-
- public SlidingWindow(long start, long end) {
- super(start, end);
- }
-
- @Override
- public boolean overlap(Window other) {
- return super.overlap(other) && other.getClass().equals(SlidingWindow.class);
- }
-
- @Override
- public boolean equalsTo(Window other) {
- return super.equalsTo(other) && other.getClass().equals(SlidingWindow.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
new file mode 100644
index 0000000..a02d4b9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class TumblingWindow extends Window {
+
+ public TumblingWindow(long start, long end) {
+ super(start, end);
+ }
+
+ @Override
+ public boolean overlap(Window other) {
+ return super.overlap(other) && other.getClass().equals(TumblingWindow.class);
+ }
+
+ @Override
+ public boolean equalsTo(Window other) {
+ return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
index d4ed0e7..cfcfb00 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -97,20 +98,25 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
}
@Override
- public WindowStoreIterator<V> fetch(K key, long timestamp) {
- return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timestamp), this.rangeTime);
+ public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+ return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
}
@Override
public void put(K key, V value) {
- putAndReturnInternalKey(key, value);
+ putAndReturnInternalKey(key, value, -1L);
}
@Override
- public byte[] putAndReturnInternalKey(K key, V value) {
+ public void put(K key, V value, long timestamp) {
+ putAndReturnInternalKey(key, value, timestamp);
+ }
+
+ @Override
+ public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
long startNs = time.nanoseconds();
try {
- byte[] binKey = this.inner.putAndReturnInternalKey(key, value);
+ byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
if (loggingEnabled) {
changeLogger.add(binKey);
@@ -174,7 +180,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
}
@Override
- public E next() {
+ public KeyValue<Long, E> next() {
return iter.next();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
index a32faf4..62b9f2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -222,6 +222,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public void close() {
+ iter.dispose();
}
}