You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/11 03:17:31 UTC
[kafka] branch 2.6 updated: KAFKA-7833: Add missing test (#8847)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 1665519 KAFKA-7833: Add missing test (#8847)
1665519 is described below
commit 1665519ae778ceaf0a89cc2d97aa5e968ea07a44
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed Jun 10 20:05:10 2020 -0700
KAFKA-7833: Add missing test (#8847)
Reviewers: Guozhang Wang <gu...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../internals/InternalTopologyBuilder.java | 7 +-
.../internals/InternalTopologyBuilderTest.java | 131 +++++++++++++++------
2 files changed, 103 insertions(+), 35 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 8d69ef1..4f1bdc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -680,8 +680,11 @@ public class InternalTopologyBuilder {
if (nodeFactories.containsKey(processorName)) {
throw new TopologyException("Processor " + processorName + " is already added.");
}
- if (stateFactories.containsKey(storeName) || globalStateBuilders.containsKey(storeName)) {
- throw new TopologyException("StateStore " + storeName + " is already added.");
+ if (stateFactories.containsKey(storeName)) {
+ throw new TopologyException("A different StateStore has already been added with the name " + storeName);
+ }
+ if (globalStateBuilders.containsKey(storeName)) {
+ throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
}
if (loggingEnabled) {
throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 8402dfd..1e87a0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -62,7 +63,7 @@ public class InternalTopologyBuilderTest {
private final Serde<String> stringSerde = Serdes.String();
private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
- private final StoreBuilder<?> storeBuilder = new MockKeyValueStoreBuilder("store", false);
+ private final StoreBuilder<?> storeBuilder = new MockKeyValueStoreBuilder("testStore", false);
@Test
public void shouldAddSourceWithOffsetReset() {
@@ -372,48 +373,112 @@ public class InternalTopologyBuilderTest {
@Test
public void shouldNotAllowToAddStoresWithSameName() {
+ final StoreBuilder<KeyValueStore<Object, Object>> otherBuilder =
+ new MockKeyValueStoreBuilder("testStore", false);
+
builder.addStateStore(storeBuilder);
- final StoreBuilder otherBuilder = new MockKeyValueStoreBuilder("store", false);
- try {
- builder.addStateStore(otherBuilder);
- fail("Should throw TopologyException with store name conflict");
- } catch (final TopologyException expected) { /* ok */ }
+
+ final TopologyException exception = assertThrows(
+ TopologyException.class,
+ () -> builder.addStateStore(otherBuilder)
+ );
+
+ assertThat(
+ exception.getMessage(),
+ equalTo("Invalid topology: A different StateStore has already been added with the name testStore")
+ );
}
@Test
public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() {
- final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false).withLoggingDisabled();
+ final StoreBuilder<KeyValueStore<Object, Object>> globalBuilder =
+ new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+
builder.addGlobalStore(
- storeBuilder,
+ globalBuilder,
+ "global-store",
+ null,
+ null,
+ null,
+ "global-topic",
+ "global-processor",
+ new MockProcessorSupplier<>()
+ );
+
+ final TopologyException exception = assertThrows(
+ TopologyException.class,
+ () -> builder.addStateStore(storeBuilder)
+ );
+
+ assertThat(
+ exception.getMessage(),
+ equalTo("Invalid topology: A different GlobalStateStore has already been added with the name testStore")
+ );
+ }
+
+ @Test
+ public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() {
+ final StoreBuilder<KeyValueStore<Object, Object>> globalBuilder =
+ new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+
+ builder.addStateStore(storeBuilder);
+
+ final TopologyException exception = assertThrows(
+ TopologyException.class,
+ () -> builder.addGlobalStore(
+ globalBuilder,
"global-store",
null,
null,
null,
"global-topic",
"global-processor",
- new MockProcessorSupplier<>());
- try {
- builder.addStateStore(storeBuilder);
- fail("Should throw TopologyException with store name conflict");
- } catch (final TopologyException expected) { /* ok */ }
+ new MockProcessorSupplier<>()
+ )
+ );
+
+ assertThat(
+ exception.getMessage(),
+ equalTo("Invalid topology: A different StateStore has already been added with the name testStore")
+ );
}
@Test
- public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() {
- final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false).withLoggingDisabled();
- builder.addStateStore(storeBuilder);
- try {
- builder.addGlobalStore(
- storeBuilder,
- "global-store",
- null,
- null,
- null,
- "global-topic",
- "global-processor",
- new MockProcessorSupplier<>());
- fail("Should throw TopologyException with store name conflict");
- } catch (final TopologyException expected) { /* ok */ }
+ public void shouldNotAllowToAddGlobalStoresWithSameName() {
+ final StoreBuilder<KeyValueStore<Object, Object>> firstGlobalBuilder =
+ new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+ final StoreBuilder<KeyValueStore<Object, Object>> secondGlobalBuilder =
+ new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+
+ builder.addGlobalStore(
+ firstGlobalBuilder,
+ "global-store",
+ null,
+ null,
+ null,
+ "global-topic",
+ "global-processor",
+ new MockProcessorSupplier<>()
+ );
+
+ final TopologyException exception = assertThrows(
+ TopologyException.class,
+ () -> builder.addGlobalStore(
+ secondGlobalBuilder,
+ "global-store-2",
+ null,
+ null,
+ null,
+ "global-topic",
+ "global-processor-2",
+ new MockProcessorSupplier<>()
+ )
+ );
+
+ assertThat(
+ exception.getMessage(),
+ equalTo("Invalid topology: A different GlobalStateStore has already been added with the name testStore")
+ );
}
@Test
@@ -682,7 +747,7 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(storeBuilder, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
assertEquals(1, stateStoreNameToSourceTopic.size());
- assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
+ assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore"));
}
@Test
@@ -692,7 +757,7 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(storeBuilder, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
assertEquals(1, stateStoreNameToSourceTopic.size());
- assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
+ assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore"));
}
@Test
@@ -704,7 +769,7 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(storeBuilder, "processor");
final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
assertEquals(1, stateStoreNameToSourceTopic.size());
- assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
+ assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("testStore"));
}
@Test
@@ -754,11 +819,11 @@ public class InternalTopologyBuilderTest {
builder.buildTopology();
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
- final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
+ final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(2, properties.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
- assertEquals("appId-store-changelog", topicConfig.name());
+ assertEquals("appId-testStore-changelog", topicConfig.name());
assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig);
}