You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/02 21:41:22 UTC
kafka git commit: HOTFIX: Fixes to javadoc and to state store name
for link joins
Repository: kafka
Updated Branches:
refs/heads/trunk c840f2a95 -> 416817920
HOTFIX: Fixes to javadoc and to state store name for link joins
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy, Guozhang Wang
Closes #1674 from enothereska/hotfix-misc-joins
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41681792
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41681792
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41681792
Branch: refs/heads/trunk
Commit: 416817920bcc0a5f74ddf5231505160a68c7d2db
Parents: c840f2a
Author: Eno Thereska <en...@gmail.com>
Authored: Tue Aug 2 14:41:18 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 2 14:41:18 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/KStream.java | 28 +++++++++++---------
.../kstream/internals/AbstractStream.java | 4 ---
.../streams/kstream/internals/KStreamImpl.java | 8 +++---
3 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 060a1ee..4b0c185 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -373,8 +373,8 @@ public interface KStream<K, V> {
/**
* Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join.
* If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
- * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+ * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names.
+ * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
* in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
* {@link org.apache.kafka.streams.StreamsConfig}.
*
@@ -404,8 +404,8 @@ public interface KStream<K, V> {
/**
* Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join
* with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
- * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+ * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names.
+ * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
* in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
* {@link org.apache.kafka.streams.StreamsConfig}.
*
@@ -425,8 +425,9 @@ public interface KStream<K, V> {
/**
* Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join.
* If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
- * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+ * Both of the joining {@link KStream}s will be materialized in local state stores with an auto-generated
+ * store name.
+ * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
* in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
* {@link org.apache.kafka.streams.StreamsConfig}.
*
@@ -456,8 +457,9 @@ public interface KStream<K, V> {
/**
* Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join
* with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
- * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+ * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
+ * store names.
+ * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
* in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
* {@link org.apache.kafka.streams.StreamsConfig}.
*
@@ -478,8 +480,9 @@ public interface KStream<K, V> {
/**
* Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join.
* If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
- * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+ * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
+ * store names.
+ * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
* in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
* {@link org.apache.kafka.streams.StreamsConfig}.
*
@@ -509,8 +512,9 @@ public interface KStream<K, V> {
/**
* Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join
* with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
- * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
- * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+ * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
+ * store names.
+ * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
* in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
* {@link org.apache.kafka.streams.StreamsConfig}.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index b764a6e..2f5b160 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -46,9 +45,6 @@ public abstract class AbstractStream<K> {
this.sourceNodes = sourceNodes;
}
- /**
- * @throws TopologyBuilderException if the streams are not joinable
- */
protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
Set<String> allSourceNodes = new HashSet<>();
allSourceNodes.addAll(sourceNodes);
http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7ecbf66..1859503 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -697,16 +697,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serde<K1> keySerde,
Serde<V1> lhsValueSerde,
Serde<V2> otherValueSerde) {
+ String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+ String joinThisName = topology.newName(LEFTJOIN_NAME);
+
StateStoreSupplier otherWindow =
- createWindowedStateStore(windows, keySerde, otherValueSerde, name + "other");
+ createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store");
KStreamJoinWindow<K1, V1>
otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamKStreamJoin<K1, R, V1, V2>
joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
- String otherWindowStreamName = topology.newName(WINDOWED_NAME);
- String joinThisName = topology.newName(LEFTJOIN_NAME);
+
topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);