You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/02 21:41:22 UTC

kafka git commit: HOTFIX: Fixes to javadoc and to state store name for link joins

Repository: kafka
Updated Branches:
  refs/heads/trunk c840f2a95 -> 416817920


HOTFIX: Fixes to javadoc and to state store name for link joins

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #1674 from enothereska/hotfix-misc-joins


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41681792
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41681792
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41681792

Branch: refs/heads/trunk
Commit: 416817920bcc0a5f74ddf5231505160a68c7d2db
Parents: c840f2a
Author: Eno Thereska <en...@gmail.com>
Authored: Tue Aug 2 14:41:18 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 2 14:41:18 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStream.java   | 28 +++++++++++---------
 .../kstream/internals/AbstractStream.java       |  4 ---
 .../streams/kstream/internals/KStreamImpl.java  |  8 +++---
 3 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 060a1ee..4b0c185 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -373,8 +373,8 @@ public interface KStream<K, V> {
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join.
      * If a record key is null it will not included in the resulting {@link KStream}
-     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
-     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names.
+     * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
      * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
      * {@link org.apache.kafka.streams.StreamsConfig}.
      *
@@ -404,8 +404,8 @@ public interface KStream<K, V> {
     /**
      * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join
      * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
-     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
-     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names.
+     * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
      * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
      * {@link org.apache.kafka.streams.StreamsConfig}.
      *
@@ -425,8 +425,9 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join.
      * If a record key is null it will not included in the resulting {@link KStream}
-     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
-     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * Both of the joining {@link KStream}s will be materialized in local state stores with an auto-generated
+     * store name.
+     * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
      * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
      * {@link org.apache.kafka.streams.StreamsConfig}.
      *
@@ -456,8 +457,9 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join
      * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
-     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
-     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
+     * store names.
+     * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
      * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
      * {@link org.apache.kafka.streams.StreamsConfig}.
      *
@@ -478,8 +480,9 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join.
      * If a record key is null it will not included in the resulting {@link KStream}
-     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
-     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
+     * store names.
+     * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
      * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
      * {@link org.apache.kafka.streams.StreamsConfig}.
      *
@@ -509,8 +512,9 @@ public interface KStream<K, V> {
     /**
      * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join
      * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream}
-     * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names.
-     * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created
+     * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated
+     * store names.
+     * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created
      * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the
      * {@link org.apache.kafka.streams.StreamsConfig}.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index b764a6e..2f5b160 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 
@@ -46,9 +45,6 @@ public abstract class AbstractStream<K> {
         this.sourceNodes = sourceNodes;
     }
 
-    /**
-     * @throws TopologyBuilderException if the streams are not joinable
-     */
     protected Set<String> ensureJoinableWith(AbstractStream<K> other) {
         Set<String> allSourceNodes = new HashSet<>();
         allSourceNodes.addAll(sourceNodes);

http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7ecbf66..1859503 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -697,16 +697,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                    Serde<K1> keySerde,
                                                    Serde<V1> lhsValueSerde,
                                                    Serde<V2> otherValueSerde) {
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = topology.newName(LEFTJOIN_NAME);
+
             StateStoreSupplier otherWindow =
-                createWindowedStateStore(windows, keySerde, otherValueSerde, name + "other");
+                createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store");
 
             KStreamJoinWindow<K1, V1>
                 otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
             KStreamKStreamJoin<K1, R, V1, V2>
                 joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
 
-            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-            String joinThisName = topology.newName(LEFTJOIN_NAME);
+
 
             topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
             topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);