You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/07 01:49:37 UTC

[kafka] branch trunk updated: KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cf092ae  KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645)
cf092ae is described below

commit cf092aeecc473b70d81c00b604e29de8c9f6d84b
Author: nafshartous <ni...@afshartous.com>
AuthorDate: Tue Mar 6 20:49:33 2018 -0500

    KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645)
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java  |  8 ++---
 .../internals/StreamsPartitionAssignor.java        |  3 +-
 .../org/apache/kafka/streams/TopologyTest.java     | 16 ++++------
 .../streams/processor/TopologyBuilderTest.java     | 37 ++++++++++------------
 .../internals/InternalTopologyBuilderTest.java     | 17 +++++-----
 5 files changed, 37 insertions(+), 44 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 317581a..42d3d70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -54,13 +55,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     }
 
     /**
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node
+     * @throws StreamsException if an attempt is made to access this state store from an unknown node
      */
-    @SuppressWarnings("deprecation")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node");
+            throw new StreamsException("Accessing from an unknown node");
         }
 
         final StateStore global = stateManager.getGlobalStore(name);
@@ -69,7 +69,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         if (!currentNode().stateStores.contains(name)) {
-            throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name);
+            throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name);
         }
 
         return stateManager.getStore(name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 71a84b2..0edbe2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
@@ -694,7 +695,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 continue;
             }
             if (numPartitions < 0) {
-                throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
+                throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
             }
 
             topic.setNumberOfPartitions(numPartitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 992ffd8..6834091 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -252,7 +251,7 @@ public class TopologyTest {
         } catch (final TopologyException expected) { }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -276,15 +275,12 @@ public class TopologyTest {
 
         try {
             new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. Did catch:", e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7a81594..f67b634 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -51,6 +51,7 @@ import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -590,8 +591,7 @@ public class TopologyBuilderTest {
         assertEquals("appId-foo", topicConfig.name());
     }
 
-
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -603,27 +603,24 @@ public class TopologyBuilderTest {
         config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
-        try {
-            final TopologyBuilder builder = new TopologyBuilder();
-            builder
-                .addSource(sourceNodeName, "topic")
-                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(sourceNodeName, "topic")
+                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName)
                 .addStateStore(
-                    Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
-                    goodNodeName)
-                .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
+                        Stores.create(LocalMockProcessorSupplier.STORE_NAME)
+                                .withStringKeys().withStringValues().inMemory()
+                                .build(), goodNodeName)
+                .addProcessor(badNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName);     
+        try {
             final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
-            driver.process("topic", null, null);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. Did catch:", e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
+
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index a39e545..901fc4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -53,7 +52,9 @@ import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -578,17 +579,15 @@ public class InternalTopologyBuilderTest {
             Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
             goodNodeName);
         builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
+        
         try {
             new ProcessorTopologyTestDriver(streamsConfig, builder);
             fail("Should have throw StreamsException");
-        } catch (final StreamsException expected) {
-            final Throwable cause = expected.getCause();
-            if (cause == null
-                || !(cause instanceof TopologyBuilderException)
-                || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
-                throw new RuntimeException("Did expect different exception. Did catch:", expected);
-            }
+        } catch (final StreamsException e) {
+            final String error = e.toString();
+            final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.