You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/28 19:08:45 UTC
kafka git commit: KAFKA-6274: Use topic plus dash as prefix of
auto-generated state store names
Repository: kafka
Updated Branches:
refs/heads/trunk 5df1eee7d -> 1a1d923f2
KAFKA-6274: Use topic plus dash as prefix of auto-generated state store names
Use `topic-` as the prefix of the auto-generated state store name.
Also add a unit test for this functionality.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Ted Yu <yu...@gmail.com>
Closes #4268 from guozhangwang/K6274-table-source-store-name
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a1d923f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a1d923f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a1d923f
Branch: refs/heads/trunk
Commit: 1a1d923f252e9b1576dad6f7285f237feb064f64
Parents: 5df1eee
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Nov 28 11:08:42 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 28 11:08:42 2017 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/StreamsBuilder.java | 12 +++++-----
.../kafka/streams/StreamsBuilderTest.java | 24 ++++++++++++++++++++
2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a1d923f/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 0aac45a..a94b0a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -224,7 +224,7 @@ public class StreamsBuilder {
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(consumed),
- new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
+ new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
}
/**
@@ -275,7 +275,7 @@ public class StreamsBuilder {
new MaterializedInternal<>(
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
internalStreamsBuilder,
- topic));
+ topic + "-"));
}
/**
@@ -300,7 +300,7 @@ public class StreamsBuilder {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
- = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
+ = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
return internalStreamsBuilder.table(topic,
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
@@ -331,7 +331,7 @@ public class StreamsBuilder {
new MaterializedInternal<>(
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
internalStreamsBuilder,
- topic);
+ topic + "-");
return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized);
@@ -399,7 +399,7 @@ public class StreamsBuilder {
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(consumed),
- new MaterializedInternal<>(materialized, internalStreamsBuilder, topic));
+ new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
}
/**
@@ -432,7 +432,7 @@ public class StreamsBuilder {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
- new MaterializedInternal<>(materialized, internalStreamsBuilder, topic);
+ new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
return internalStreamsBuilder.globalTable(topic,
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde())),
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a1d923f/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 33ede93..cee01dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -36,12 +36,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class StreamsBuilderTest {
@@ -160,6 +162,28 @@ public class StreamsBuilderTest {
assertThat(store.get(1L), equalTo("value1"));
assertThat(store.get(2L), equalTo("value2"));
}
+
+ @Test
+ public void shouldUseDefaultNodeAndStoreNames() {
+ final String topic = "topic";
+ builder.table(topic,
+ Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>with(Serdes.Long(), Serdes.String()));
+
+ final Iterator<TopologyDescription.Subtopology> subtopologies = builder.build().describe().subtopologies().iterator();
+ final TopologyDescription.Subtopology subtopology = subtopologies.next();
+
+ final Iterator<TopologyDescription.Node> nodes = subtopology.nodes().iterator();
+ TopologyDescription.Node node = nodes.next();
+ assertThat(node.name(), equalTo("KSTREAM-SOURCE-0000000001"));
+ node = nodes.next();
+ assertThat(node.name(), equalTo("KTABLE-SOURCE-0000000002"));
+ final Iterator<String> stores = ((TopologyDescription.Processor) node).stores().iterator();
+ assertThat(stores.next(), equalTo(topic + "-STATE-STORE-0000000000"));
+
+ assertFalse(nodes.hasNext());
+ assertFalse(stores.hasNext());
+ assertFalse(subtopologies.hasNext());
+ }
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {