You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/09 08:33:50 UTC
kafka git commit: KAFKA-2962: stream-table table-table joins
Repository: kafka
Updated Branches:
refs/heads/trunk 454d7d090 -> 991aad23b
KAFKA-2962: stream-table table-table joins
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #644 from ymatsuda/join_methods
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/991aad23
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/991aad23
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/991aad23
Branch: refs/heads/trunk
Commit: 991aad23baa2f55d405d374b0a01785acdc63974
Parents: 454d7d0
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Dec 8 23:33:46 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 8 23:33:46 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 10 ++
.../kafka/streams/kstream/KStreamBuilder.java | 2 +-
.../apache/kafka/streams/kstream/KTable.java | 33 +++++
.../kstream/internals/AbstractStream.java | 64 ++++++++
.../streams/kstream/internals/KStreamImpl.java | 35 +++--
.../streams/kstream/internals/KStreamJoin.java | 9 --
.../internals/KStreamKTableLeftJoin.java | 62 ++++++++
.../kstream/internals/KStreamWindowedImpl.java | 2 +-
.../KTableDerivedValueGetterSupplier.java | 28 ----
.../streams/kstream/internals/KTableFilter.java | 15 +-
.../streams/kstream/internals/KTableImpl.java | 147 ++++++++++++-------
.../kstream/internals/KTableKTableJoin.java | 115 +++++++++++++++
.../kstream/internals/KTableKTableLeftJoin.java | 112 ++++++++++++++
.../kstream/internals/KTableMapValues.java | 14 +-
.../streams/kstream/internals/KTableMerge.java | 92 ++++++++++++
.../internals/KTableProcessorSupplier.java | 4 +-
.../streams/kstream/internals/KTableSource.java | 6 +-
17 files changed, 634 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 93303eb..d3931ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -172,4 +172,14 @@ public interface KStream<K, V> {
*/
void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
+ /**
+ * Combines values of this stream with KTable using Left Join.
+ *
+ * @param ktable the instance of KTable joined with this stream
+ * @param joiner ValueJoiner
+ * @param <V1> the value type of the other stream
+ * @param <V2> the value type of the new stream
+ */
+ <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> ktable, ValueJoiner<V, V1, V2> joiner);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index ca1a10d..0ed5144 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -102,7 +102,7 @@ public class KStreamBuilder extends TopologyBuilder {
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableSource<>(topic);
addProcessor(name, processorSupplier, source);
- return new KTableImpl<>(this, name, processorSupplier, source, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer);
+ return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerializer, valSerializer, keyDeserializer, valDeserializer);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 75fb87a..c6e7975 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -105,4 +105,37 @@ public interface KTable<K, V> {
*/
KStream<K, V> toStream();
+ /**
+ * Combines values of this KTable with another KTable using Inner Join.
+ *
+ * @param other the instance of KTable joined with this stream
+ * @param joiner ValueJoiner
+ * @param <V1> the value type of the other stream
+ * @param <V2> the value type of the new stream
+ * @return the instance of KStream
+ */
+ <V1, V2> KTable<K, V2> join(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+
+ /**
+ * Combines values of this KTable with another KTable using Outer Join.
+ *
+ * @param other the instance of KTable joined with this stream
+ * @param joiner ValueJoiner
+ * @param <V1> the value type of the other stream
+ * @param <V2> the value type of the new stream
+ * @return the instance of KStream
+ */
+ <V1, V2> KTable<K, V2> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+
+ /**
+ * Combines values of this KTable with another KTable using Left Join.
+ *
+ * @param other the instance of KTable joined with this stream
+ * @param joiner ValueJoiner
+ * @param <V1> the value type of the other stream
+ * @param <V2> the value type of the new stream
+ * @return the instance of KStream
+ */
+ <V1, V2> KTable<K, V2> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, V2> joiner);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
new file mode 100644
index 0000000..fa34ba1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class AbstractStream<K> {
+
+ protected final KStreamBuilder topology;
+ protected final String name;
+ protected final Set<String> sourceNodes;
+
+ public AbstractStream(KStreamBuilder topology, String name, Set<String> sourceNodes) {
+ this.topology = topology;
+ this.name = name;
+ this.sourceNodes = sourceNodes;
+ }
+
+ protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
+ Set<String> thisSourceNodes = sourceNodes;
+ Set<String> otherSourceNodes = other.sourceNodes;
+
+ if (thisSourceNodes == null || otherSourceNodes == null)
+ throw new KafkaException("not joinable");
+
+ Set<String> allSourceNodes = new HashSet<>();
+ allSourceNodes.addAll(thisSourceNodes);
+ allSourceNodes.addAll(otherSourceNodes);
+
+ topology.copartitionSources(allSourceNodes);
+
+ return allSourceNodes;
+ }
+
+ public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
+ return new ValueJoiner<T2, T1, R>() {
+ @Override
+ public R apply(T2 value2, T1 value1) {
+ return joiner.apply(value1, value2);
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index fc8f4c6..67a2d27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,8 +20,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamWindowed;
@@ -35,7 +37,7 @@ import java.lang.reflect.Array;
import java.util.HashSet;
import java.util.Set;
-public class KStreamImpl<K, V> implements KStream<K, V> {
+public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
private static final String FILTER_NAME = "KSTREAM-FILTER-";
@@ -65,18 +67,14 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
+ public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
+
public static final String MERGE_NAME = "KSTREAM-MERGE-";
public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
- protected final KStreamBuilder topology;
- public final String name;
- protected final Set<String> sourceNodes;
-
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
- this.topology = topology;
- this.name = name;
- this.sourceNodes = sourceNodes;
+ super(topology, name, sourceNodes);
}
@Override
@@ -191,9 +189,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
Serializer<V> valSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V> valDeserializer) {
- String sendName = topology.newName(SINK_NAME);
-
- topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+ to(topic, keySerializer, valSerializer);
return topology.from(keyDeserializer, valDeserializer, topic);
}
@@ -205,9 +201,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public void to(String topic) {
- String name = topology.newName(SINK_NAME);
-
- topology.addSink(name, topic, this.name);
+ to(topic, null, null);
}
@Override
@@ -244,4 +238,17 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
topology.addProcessor(name, processorSupplier, this.name);
topology.connectProcessorAndStateStores(name, stateStoreNames);
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+ String name = topology.newName(LEFTJOIN_NAME);
+
+ topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
+
+ return new KStreamImpl<>(topology, name, allSourceNodes);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
index 5e8186e..eefb8c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java
@@ -81,13 +81,4 @@ class KStreamJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
}
}
- public static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
- return new ValueJoiner<T2, T1, R>() {
- @Override
- public R apply(T2 value2, T1 value1) {
- return joiner.apply(value1, value2);
- }
- };
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
new file mode 100644
index 0000000..51a6277
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamKTableLeftJoin<K, V, V1, V2> implements ProcessorSupplier<K, V1> {
+
+ private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
+ private final ValueJoiner<V1, V2, V> joiner;
+
+ KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, V> joiner) {
+ this.valueGetterSupplier = table.valueGetterSupplier();
+ this.joiner = joiner;
+ }
+
+ @Override
+ public Processor<K, V1> get() {
+ return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get());
+ }
+
+ private class KStreamKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
+
+ private final KTableValueGetter<K, V2> valueGetter;
+
+ public KStreamKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+ this.valueGetter = valueGetter;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ valueGetter.init(context);
+ }
+
+ @Override
+ public void process(K key, V1 value) {
+ context().forward(key, joiner.apply(value, valueGetter.get(key)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
index 100fbee..c71c11b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java
@@ -50,7 +50,7 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implement
allSourceNodes.addAll(((KStreamWindowedImpl<K, V1>) other).sourceNodes);
KStreamJoin<K, V2, V, V1> joinThis = new KStreamJoin<>(otherWindowName, valueJoiner);
- KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner));
+ KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, reverseJoiner(valueJoiner));
KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
String joinThisName = topology.newName(JOINTHIS_NAME);
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
deleted file mode 100644
index 731d7f7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableDerivedValueGetterSupplier.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-public abstract class KTableDerivedValueGetterSupplier<K, V1, V2> implements KTableValueGetterSupplier<K, V2> {
-
- protected final KTableValueGetterSupplier<K, V1> parentValueGetterSupplier;
-
- public KTableDerivedValueGetterSupplier(KTableValueGetterSupplier<K, V1> parentValueGetterSupplier) {
- this.parentValueGetterSupplier = parentValueGetterSupplier;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 212b1c9..f8f00b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -22,12 +22,14 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
+class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
+ private final KTableImpl<K, ?, V> parent;
private final Predicate<K, V> predicate;
private final boolean filterOut;
- public KTableFilter(Predicate<K, V> predicate, boolean filterOut) {
+ public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterOut) {
+ this.parent = parent;
this.predicate = predicate;
this.filterOut = filterOut;
}
@@ -38,8 +40,11 @@ class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
}
@Override
- public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
- return new KTableDerivedValueGetterSupplier<K, V, V>(parentValueGetterSupplier) {
+ public KTableValueGetterSupplier<K, V> view() {
+
+ final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+ return new KTableValueGetterSupplier<K, V>() {
public KTableValueGetter<K, V> get() {
return new KTableFilterValueGetter(parentValueGetterSupplier.get());
@@ -74,10 +79,12 @@ class KTableFilter<K, V> extends KTableProcessorSupplier<K, V, V> {
this.parentGetter = parentGetter;
}
+ @Override
public void init(ProcessorContext context) {
parentGetter.init(context);
}
+ @Override
public V get(K key) {
return computeNewValue(key, parentGetter.get(key));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 47c9b09..308e4f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -23,10 +23,11 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import java.util.Collections;
+import java.util.Set;
/**
* The implementation class of KTable
@@ -34,7 +35,7 @@ import java.util.Collections;
* @param <S> the source's (parent's) value type
* @param <V> the value type
*/
-public class KTableImpl<K, S, V> implements KTable<K, V> {
+public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
private static final String FILTER_NAME = "KTABLE-FILTER-";
@@ -44,13 +45,22 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
public static final String SOURCE_NAME = "KTABLE-SOURCE-";
- protected final KStreamBuilder topology;
- public final String name;
+ public static final String SINK_NAME = "KTABLE-SINK-";
+
+ public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
+
+ public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
+
+ public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
+
+ public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
+
+ public static final String LEFTJOIN_NAME = "KTABLE-LEFTJOIN-";
+
+ public static final String MERGE_NAME = "KTABLE-MERGE-";
+
public final KTableProcessorSupplier<K, S, V> processorSupplier;
- private final String sourceNode;
- private final KTableImpl<K, ?, S> parent;
- private final String topic;
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private final Deserializer<K> keyDeserializer;
@@ -59,72 +69,53 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
public KTableImpl(KStreamBuilder topology,
String name,
KTableProcessorSupplier<K, S, V> processorSupplier,
- String sourceNode,
- KTableImpl<K, ?, S> parent) {
- this(topology, name, processorSupplier, sourceNode, null, null, null, null, null, parent);
+ Set<String> sourceNodes) {
+ this(topology, name, processorSupplier, sourceNodes, null, null, null, null);
}
public KTableImpl(KStreamBuilder topology,
String name,
KTableProcessorSupplier<K, S, V> processorSupplier,
- String sourceNode,
- String topic,
+ Set<String> sourceNodes,
Serializer<K> keySerializer,
Serializer<V> valSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V> valDeserializer) {
- this(topology, name, processorSupplier, sourceNode, topic, keySerializer, valSerializer, keyDeserializer, valDeserializer, null);
- }
-
- private KTableImpl(KStreamBuilder topology,
- String name,
- KTableProcessorSupplier<K, S, V> processorSupplier,
- String sourceNode,
- String topic,
- Serializer<K> keySerializer,
- Serializer<V> valSerializer,
- Deserializer<K> keyDeserializer,
- Deserializer<V> valDeserializer,
- KTableImpl<K, ?, S> parent) {
- this.topology = topology;
- this.name = name;
+ super(topology, name, sourceNodes);
this.processorSupplier = processorSupplier;
- this.sourceNode = sourceNode;
- this.topic = topic;
this.keySerializer = keySerializer;
this.valSerializer = valSerializer;
this.keyDeserializer = keyDeserializer;
this.valDeserializer = valDeserializer;
- this.parent = parent;
}
@Override
public KTable<K, V> filter(Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
- KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, false);
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, false);
topology.addProcessor(name, processorSupplier, this.name);
- return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
}
@Override
public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
- KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(predicate, true);
+ KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
topology.addProcessor(name, processorSupplier, this.name);
- return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
}
@Override
public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
String name = topology.newName(MAPVALUES_NAME);
- KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(mapper);
+ KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper);
topology.addProcessor(name, processorSupplier, this.name);
- return new KTableImpl<>(topology, name, processorSupplier, sourceNode, this);
+ return new KTableImpl<>(topology, name, processorSupplier, sourceNodes);
}
@Override
@@ -133,9 +124,7 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
Serializer<V> valSerializer,
Deserializer<K> keyDeserializer,
Deserializer<V> valDeserializer) {
- String sendName = topology.newName(KStreamImpl.SINK_NAME);
-
- topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
+ to(topic, keySerializer, valSerializer);
return topology.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic);
}
@@ -147,14 +136,12 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
@Override
public void to(String topic) {
- String name = topology.newName(KStreamImpl.SINK_NAME);
-
- topology.addSink(name, topic, this.name);
+ to(topic, null, null);
}
@Override
public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
- String name = topology.newName(KStreamImpl.SINK_NAME);
+ String name = topology.newName(SINK_NAME);
topology.addSink(name, topic, keySerializer, valSerializer, this.name);
}
@@ -165,25 +152,85 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
topology.addProcessor(name, new KStreamPassThrough(), this.name);
- return new KStreamImpl<>(topology, name, Collections.singleton(sourceNode));
+ return new KStreamImpl<>(topology, name, sourceNodes);
}
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
- if (parent != null) {
- return processorSupplier.view(parent.valueGetterSupplier());
- } else {
- KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
+ if (processorSupplier instanceof KTableSource) {
+ final KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
synchronized (source) {
if (!source.isMaterialized()) {
StateStoreSupplier storeSupplier =
- new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
+ new KTableStoreSupplier<>(source.topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
// mark this state is non internal hence it is read directly from a user topic
topology.addStateStore(storeSupplier, false, name);
source.materialize();
}
}
- return new KTableSourceValueGetterSupplier<>(topic);
+ return new KTableSourceValueGetterSupplier<>(source.topic);
+ } else {
+ return processorSupplier.view();
}
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+ String joinThisName = topology.newName(JOINTHIS_NAME);
+ String joinOtherName = topology.newName(JOINOTHER_NAME);
+ String joinMergeName = topology.newName(MERGE_NAME);
+
+ KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+ KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+ KTableMerge<K, R> joinMerge = new KTableMerge<>(
+ new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
+ new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+ );
+
+ topology.addProcessor(joinThisName, joinThis, this.name);
+ topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
+ topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+
+ return new KTableImpl<>(topology, name, joinThis, allSourceNodes);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+ String joinThisName = topology.newName(OUTERTHIS_NAME);
+ String joinOtherName = topology.newName(OUTEROTHER_NAME);
+ String joinMergeName = topology.newName(MERGE_NAME);
+
+ KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+ KTableKTableLeftJoin<K, R, V1, V> joinOther = new KTableKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+ KTableMerge<K, R> joinMerge = new KTableMerge<>(
+ new KTableImpl<K, V, R>(topology, joinThisName, null, this.sourceNodes),
+ new KTableImpl<K, V1, R>(topology, joinOtherName, null, ((KTableImpl<K, ?, ?>) other).sourceNodes)
+ );
+
+ topology.addProcessor(joinThisName, joinThis, this.name);
+ topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
+ topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
+
+ return new KTableImpl<>(topology, name, joinMerge, allSourceNodes);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
+ Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
+
+ String name = topology.newName(LEFTJOIN_NAME);
+
+ KTableKTableLeftJoin<K, R, V, V1> leftJoin = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
+
+ topology.addProcessor(name, leftJoin, this.name);
+
+ return new KTableImpl<>(topology, name, leftJoin, allSourceNodes);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
new file mode 100644
index 0000000..058e75d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+
+ private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+ private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+ private final ValueJoiner<V1, V2, V> joiner;
+
+ KTableKTableJoin(KTableImpl<K, ?, V1> table1,
+ KTableImpl<K, ?, V2> table2,
+ ValueJoiner<V1, V2, V> joiner) {
+ this.valueGetterSupplier1 = table1.valueGetterSupplier();
+ this.valueGetterSupplier2 = table2.valueGetterSupplier();
+ this.joiner = joiner;
+ }
+
+ @Override
+ public Processor<K, V1> get() {
+ return new KTableKTableJoinProcessor(valueGetterSupplier2.get());
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view() {
+ return new KTableValueGetterSupplier<K, V>() {
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+ }
+
+ };
+ }
+
+ private class KTableKTableJoinProcessor extends AbstractProcessor<K, V1> {
+
+ private final KTableValueGetter<K, V2> valueGetter;
+
+ public KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+ this.valueGetter = valueGetter;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ valueGetter.init(context);
+ }
+
+ @Override
+ public void process(K key, V1 value1) {
+ V newValue = null;
+
+ if (value1 != null) {
+ V2 value2 = valueGetter.get(key);
+
+ if (value2 != null)
+ newValue = joiner.apply(value1, value2);
+ }
+
+ context().forward(key, newValue);
+ }
+ }
+
+ private class KTableKTableJoinValueGetter implements KTableValueGetter<K, V> {
+
+ private final KTableValueGetter<K, V1> valueGetter1;
+ private final KTableValueGetter<K, V2> valueGetter2;
+
+ public KTableKTableJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+ this.valueGetter1 = valueGetter1;
+ this.valueGetter2 = valueGetter2;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ valueGetter1.init(context);
+ valueGetter2.init(context);
+ }
+
+ @Override
+ public V get(K key) {
+ V1 value1 = valueGetter1.get(key);
+
+ if (value1 != null) {
+ V2 value2 = valueGetter2.get(key);
+ return joiner.apply(value1, value2);
+ } else {
+ return null;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
new file mode 100644
index 0000000..fe4e280
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableKTableLeftJoin<K, V, V1, V2> implements KTableProcessorSupplier<K, V1, V> {
+
+ private final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+ private final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+ private final ValueJoiner<V1, V2, V> joiner;
+
+ KTableKTableLeftJoin(KTableImpl<K, ?, V1> table1,
+ KTableImpl<K, ?, V2> table2,
+ ValueJoiner<V1, V2, V> joiner) {
+ this.valueGetterSupplier1 = table1.valueGetterSupplier();
+ this.valueGetterSupplier2 = table2.valueGetterSupplier();
+ this.joiner = joiner;
+ }
+
+ @Override
+ public Processor<K, V1> get() {
+ return new KTableKTableLeftJoinProcessor(valueGetterSupplier2.get());
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view() {
+ return new KTableValueGetterSupplier<K, V>() {
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
+ }
+
+ };
+ }
+
+ private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
+
+ private final KTableValueGetter<K, V2> valueGetter;
+
+ public KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
+ this.valueGetter = valueGetter;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+ valueGetter.init(context);
+ }
+
+ @Override
+ public void process(K key, V1 value1) {
+ V newValue = null;
+
+ if (value1 != null)
+ newValue = joiner.apply(value1, valueGetter.get(key));
+
+ context().forward(key, newValue);
+ }
+
+ }
+
+ private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, V> {
+
+ private final KTableValueGetter<K, V1> valueGetter1;
+ private final KTableValueGetter<K, V2> valueGetter2;
+
+ public KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+ this.valueGetter1 = valueGetter1;
+ this.valueGetter2 = valueGetter2;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ valueGetter1.init(context);
+ valueGetter2.init(context);
+ }
+
+ @Override
+ public V get(K key) {
+ V1 value1 = valueGetter1.get(key);
+
+ if (value1 != null) {
+ V2 value2 = valueGetter2.get(key);
+ return joiner.apply(value1, value2);
+ } else {
+ return null;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 0d14390..300cce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -22,11 +22,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
+class KTableMapValues<K1, V1, V2> implements KTableProcessorSupplier<K1, V1, V2> {
+ private final KTableImpl<K1, ?, V1> parent;
private final ValueMapper<V1, V2> mapper;
- public KTableMapValues(ValueMapper<V1, V2> mapper) {
+ public KTableMapValues(KTableImpl<K1, ?, V1> parent, ValueMapper<V1, V2> mapper) {
+ this.parent = parent;
this.mapper = mapper;
}
@@ -36,8 +38,10 @@ class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
}
@Override
- public KTableValueGetterSupplier<K1, V2> view(KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier) {
- return new KTableDerivedValueGetterSupplier<K1, V1, V2>(parentValueGetterSupplier) {
+ public KTableValueGetterSupplier<K1, V2> view() {
+ final KTableValueGetterSupplier<K1, V1> parentValueGetterSupplier = parent.valueGetterSupplier();
+
+ return new KTableValueGetterSupplier<K1, V2>() {
public KTableValueGetter<K1, V2> get() {
return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
@@ -72,10 +76,12 @@ class KTableMapValues<K1, V1, V2> extends KTableProcessorSupplier<K1, V1, V2> {
this.parentGetter = parentGetter;
}
+ @Override
public void init(ProcessorContext context) {
parentGetter.init(context);
}
+ @Override
public V2 get(K1 key) {
return computeNewValue(parentGetter.get(key));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
new file mode 100644
index 0000000..715ea6c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMerge.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+class KTableMerge<K, V> implements KTableProcessorSupplier<K, V, V> {
+
+ private final KTableImpl<K, ?, V>[] parents;
+
+ public KTableMerge(KTableImpl<K, ?, V>... parents) {
+ this.parents = parents.clone();
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KTableMergeProcessor<>();
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view() {
+ final KTableValueGetterSupplier<K, V>[] valueGetterSuppliers = new KTableValueGetterSupplier[parents.length];
+
+ for (int i = 0; i < parents.length; i++) {
+ valueGetterSuppliers[i] = parents[i].valueGetterSupplier();
+ }
+ return new KTableValueGetterSupplier<K, V>() {
+
+ public KTableValueGetter<K, V> get() {
+ KTableValueGetter<K, V>[] valueGetters = new KTableValueGetter[valueGetterSuppliers.length];
+
+ for (int i = 0; i < valueGetters.length; i++) {
+ valueGetters[i] = valueGetterSuppliers[i].get();
+ }
+ return new KTableMergeValueGetter(valueGetters);
+ }
+
+ };
+ }
+
+ private class KTableMergeProcessor<K, V> extends AbstractProcessor<K, V> {
+ @Override
+ public void process(K key, V value) {
+ context().forward(key, value);
+ }
+ }
+
+ private class KTableMergeValueGetter implements KTableValueGetter<K, V> {
+
+ private final KTableValueGetter<K, V>[] valueGetters;
+
+ public KTableMergeValueGetter(KTableValueGetter<K, V>[] valueGetters) {
+ this.valueGetters = valueGetters;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ for (KTableValueGetter<K, V> valueGetter : valueGetters) {
+ valueGetter.init(context);
+ }
+ }
+
+ @Override
+ public V get(K key) {
+ for (KTableValueGetter<K, V> valueGetter : valueGetters) {
+ V value = valueGetter.get(key);
+ if (value != null)
+ return value;
+ }
+ return null;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
index cc6467f..2fe5c15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableProcessorSupplier.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-public abstract class KTableProcessorSupplier<K, V, T> implements ProcessorSupplier<K, V> {
+public interface KTableProcessorSupplier<K, V, T> extends ProcessorSupplier<K, V> {
- public abstract KTableValueGetterSupplier<K, T> view(KTableValueGetterSupplier<K, V> parentValueGetterFactory);
+ KTableValueGetterSupplier<K, T> view();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/991aad23/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 93790ed..60b2d5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
-public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
+public class KTableSource<K, V> implements KTableProcessorSupplier<K, V, V> {
- private final String topic;
+ public final String topic;
private boolean materialized = false;
@@ -46,7 +46,7 @@ public class KTableSource<K, V> extends KTableProcessorSupplier<K, V, V> {
}
@Override
- public KTableValueGetterSupplier<K, V> view(KTableValueGetterSupplier<K, V> parentValueGetterSupplier) {
+ public KTableValueGetterSupplier<K, V> view() {
throw new IllegalStateException("a view cannot be define on the ktable source");
}