You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/11 03:17:31 UTC

[kafka] branch 2.6 updated: KAFKA-7833: Add missing test (#8847)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 1665519  KAFKA-7833: Add missing test (#8847)
1665519 is described below

commit 1665519ae778ceaf0a89cc2d97aa5e968ea07a44
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed Jun 10 20:05:10 2020 -0700

    KAFKA-7833: Add missing test (#8847)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../internals/InternalTopologyBuilder.java         |   7 +-
 .../internals/InternalTopologyBuilderTest.java     | 131 +++++++++++++++------
 2 files changed, 103 insertions(+), 35 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 8d69ef1..4f1bdc6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -680,8 +680,11 @@ public class InternalTopologyBuilder {
         if (nodeFactories.containsKey(processorName)) {
             throw new TopologyException("Processor " + processorName + " is already added.");
         }
-        if (stateFactories.containsKey(storeName) || globalStateBuilders.containsKey(storeName)) {
-            throw new TopologyException("StateStore " + storeName + " is already added.");
+        if (stateFactories.containsKey(storeName)) {
+            throw new TopologyException("A different StateStore has already been added with the name " + storeName);
+        }
+        if (globalStateBuilders.containsKey(storeName)) {
+            throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
         }
         if (loggingEnabled) {
             throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 8402dfd..1e87a0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -62,7 +63,7 @@ public class InternalTopologyBuilderTest {
 
     private final Serde<String> stringSerde = Serdes.String();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
-    private final StoreBuilder<?> storeBuilder = new MockKeyValueStoreBuilder("store", false);
+    private final StoreBuilder<?> storeBuilder = new MockKeyValueStoreBuilder("testStore", false);
 
     @Test
     public void shouldAddSourceWithOffsetReset() {
@@ -372,48 +373,112 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void shouldNotAllowToAddStoresWithSameName() {
+        final StoreBuilder<KeyValueStore<Object, Object>> otherBuilder =
+            new MockKeyValueStoreBuilder("testStore", false);
+
         builder.addStateStore(storeBuilder);
-        final StoreBuilder otherBuilder = new MockKeyValueStoreBuilder("store", false);
-        try {
-            builder.addStateStore(otherBuilder);
-            fail("Should throw TopologyException with store name conflict");
-        } catch (final TopologyException expected) { /* ok */ }
+
+        final TopologyException exception = assertThrows(
+            TopologyException.class,
+            () -> builder.addStateStore(otherBuilder)
+        );
+
+        assertThat(
+            exception.getMessage(),
+            equalTo("Invalid topology: A different StateStore has already been added with the name testStore")
+        );
     }
 
     @Test
     public void shouldNotAllowToAddStoresWithSameNameWhenFirstStoreIsGlobal() {
-        final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false).withLoggingDisabled();
+        final StoreBuilder<KeyValueStore<Object, Object>> globalBuilder =
+            new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+
         builder.addGlobalStore(
-                storeBuilder,
+            globalBuilder,
+            "global-store",
+            null,
+            null,
+            null,
+            "global-topic",
+            "global-processor",
+            new MockProcessorSupplier<>()
+        );
+
+        final TopologyException exception = assertThrows(
+            TopologyException.class,
+            () -> builder.addStateStore(storeBuilder)
+        );
+
+        assertThat(
+            exception.getMessage(),
+            equalTo("Invalid topology: A different GlobalStateStore has already been added with the name testStore")
+        );
+    }
+
+    @Test
+    public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() {
+        final StoreBuilder<KeyValueStore<Object, Object>> globalBuilder =
+            new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+
+        builder.addStateStore(storeBuilder);
+
+        final TopologyException exception = assertThrows(
+            TopologyException.class,
+            () -> builder.addGlobalStore(
+                globalBuilder,
                 "global-store",
                 null,
                 null,
                 null,
                 "global-topic",
                 "global-processor",
-                new MockProcessorSupplier<>());
-        try {
-            builder.addStateStore(storeBuilder);
-            fail("Should throw TopologyException with store name conflict");
-        } catch (final TopologyException expected) { /* ok */ }
+                new MockProcessorSupplier<>()
+            )
+        );
+
+        assertThat(
+            exception.getMessage(),
+            equalTo("Invalid topology: A different StateStore has already been added with the name testStore")
+        );
     }
 
     @Test
-    public void shouldNotAllowToAddStoresWithSameNameWhenSecondStoreIsGlobal() {
-        final StoreBuilder storeBuilder = new MockKeyValueStoreBuilder("store", false).withLoggingDisabled();
-        builder.addStateStore(storeBuilder);
-        try {
-            builder.addGlobalStore(
-                    storeBuilder,
-                    "global-store",
-                    null,
-                    null,
-                    null,
-                    "global-topic",
-                    "global-processor",
-                    new MockProcessorSupplier<>());
-            fail("Should throw TopologyException with store name conflict");
-        } catch (final TopologyException expected) { /* ok */ }
+    public void shouldNotAllowToAddGlobalStoresWithSameName() {
+        final StoreBuilder<KeyValueStore<Object, Object>> firstGlobalBuilder =
+            new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+        final StoreBuilder<KeyValueStore<Object, Object>> secondGlobalBuilder =
+            new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
+
+        builder.addGlobalStore(
+            firstGlobalBuilder,
+            "global-store",
+            null,
+            null,
+            null,
+            "global-topic",
+            "global-processor",
+            new MockProcessorSupplier<>()
+        );
+
+        final TopologyException exception = assertThrows(
+            TopologyException.class,
+            () -> builder.addGlobalStore(
+                secondGlobalBuilder,
+                "global-store-2",
+                null,
+                null,
+                null,
+                "global-topic",
+                "global-processor-2",
+                new MockProcessorSupplier<>()
+            )
+        );
+
+        assertThat(
+            exception.getMessage(),
+            equalTo("Invalid topology: A different GlobalStateStore has already been added with the name testStore")
+        );
     }
 
     @Test
@@ -682,7 +747,7 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
+        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore"));
     }
 
     @Test
@@ -692,7 +757,7 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store"));
+        assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("testStore"));
     }
 
     @Test
@@ -704,7 +769,7 @@ public class InternalTopologyBuilderTest {
         builder.addStateStore(storeBuilder, "processor");
         final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
-        assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
+        assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("testStore"));
     }
 
     @Test
@@ -754,11 +819,11 @@ public class InternalTopologyBuilderTest {
         builder.buildTopology();
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
+        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog");
         final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
         assertEquals(2, properties.size());
         assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
-        assertEquals("appId-store-changelog", topicConfig.name());
+        assertEquals("appId-testStore-changelog", topicConfig.name());
         assertTrue(topicConfig instanceof UnwindowedChangelogTopicConfig);
     }