You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/28 19:08:45 UTC

kafka git commit: KAFKA-6274: Use topic plus dash as prefix of auto-generated state store names

Repository: kafka
Updated Branches:
  refs/heads/trunk 5df1eee7d -> 1a1d923f2


KAFKA-6274: Use topic plus dash as prefix of auto-generated state store names

Use `topic-` as the prefix of the auto-generated state store name.

Also add a unit test for this functionality.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Ted Yu <yu...@gmail.com>

Closes #4268 from guozhangwang/K6274-table-source-store-name


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a1d923f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a1d923f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a1d923f

Branch: refs/heads/trunk
Commit: 1a1d923f252e9b1576dad6f7285f237feb064f64
Parents: 5df1eee
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Nov 28 11:08:42 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 28 11:08:42 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamsBuilder.java    | 12 +++++-----
 .../kafka/streams/StreamsBuilderTest.java       | 24 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a1d923f/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 0aac45a..a94b0a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -224,7 +224,7 @@ public class StreamsBuilder {
         materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
         return internalStreamsBuilder.table(topic,
                                             new ConsumedInternal<>(consumed),
-                                            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
+                                            new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
     }
 
     /**
@@ -275,7 +275,7 @@ public class StreamsBuilder {
                                             new MaterializedInternal<>(
                                                     Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
                                                     internalStreamsBuilder,
-                                                    topic));
+                                                    topic + "-"));
     }
 
     /**
@@ -300,7 +300,7 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
-                = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
+                = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
         return internalStreamsBuilder.table(topic,
                                             new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                  materializedInternal.valueSerde())),
@@ -331,7 +331,7 @@ public class StreamsBuilder {
                 new MaterializedInternal<>(
                         Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
                         internalStreamsBuilder,
-                        topic);
+                        topic + "-");
 
 
         return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
@@ -399,7 +399,7 @@ public class StreamsBuilder {
         materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
         return internalStreamsBuilder.globalTable(topic,
                                                   new ConsumedInternal<>(consumed),
-                                                  new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
+                                                  new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
     }
 
     /**
@@ -432,7 +432,7 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
-                new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
+                new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
         return internalStreamsBuilder.globalTable(topic,
                                                   new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                        materializedInternal.valueSerde())),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a1d923f/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 33ede93..cee01dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -36,12 +36,14 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class StreamsBuilderTest {
 
@@ -160,6 +162,28 @@ public class StreamsBuilderTest {
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
     }
+
+    @Test
+    public void shouldUseDefaultNodeAndStoreNames() {
+        final String topic = "topic";
+        builder.table(topic,
+                Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>with(Serdes.Long(), Serdes.String()));
+
+        final Iterator<TopologyDescription.Subtopology> subtopologies = builder.build().describe().subtopologies().iterator();
+        final TopologyDescription.Subtopology subtopology = subtopologies.next();
+
+        final Iterator<TopologyDescription.Node> nodes = subtopology.nodes().iterator();
+        TopologyDescription.Node node = nodes.next();
+        assertThat(node.name(), equalTo("KSTREAM-SOURCE-0000000001"));
+        node = nodes.next();
+        assertThat(node.name(), equalTo("KTABLE-SOURCE-0000000002"));
+        final Iterator<String> stores = ((TopologyDescription.Processor) node).stores().iterator();
+        assertThat(stores.next(), equalTo(topic + "-STATE-STORE-0000000000"));
+
+        assertFalse(nodes.hasNext());
+        assertFalse(stores.hasNext());
+        assertFalse(subtopologies.hasNext());
+    }
     
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {