You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/07 01:49:37 UTC
[kafka] branch trunk updated: KAFKA-5660 Don't throw
TopologyBuilderException during runtime (#4645)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cf092ae KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645)
cf092ae is described below
commit cf092aeecc473b70d81c00b604e29de8c9f6d84b
Author: nafshartous <ni...@afshartous.com>
AuthorDate: Tue Mar 6 20:49:33 2018 -0500
KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645)
Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../processor/internals/ProcessorContextImpl.java | 8 ++---
.../internals/StreamsPartitionAssignor.java | 3 +-
.../org/apache/kafka/streams/TopologyTest.java | 16 ++++------
.../streams/processor/TopologyBuilderTest.java | 37 ++++++++++------------
.../internals/InternalTopologyBuilderTest.java | 17 +++++-----
5 files changed, 37 insertions(+), 44 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 317581a..42d3d70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -54,13 +55,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
/**
- * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node
+ * @throws StreamsException if an attempt is made to access this state store from an unknown node
*/
- @SuppressWarnings("deprecation")
@Override
public StateStore getStateStore(final String name) {
if (currentNode() == null) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node");
+ throw new StreamsException("Accessing from an unknown node");
}
final StateStore global = stateManager.getGlobalStore(name);
@@ -69,7 +69,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
if (!currentNode().stateStores.contains(name)) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name);
+ throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name);
}
return stateManager.getStore(name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 71a84b2..0edbe2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
@@ -694,7 +695,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
continue;
}
if (numPartitions < 0) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
+ throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
}
topic.setNumberOfPartitions(numPartitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 992ffd8..6834091 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -252,7 +251,7 @@ public class TopologyTest {
} catch (final TopologyException expected) { }
}
- @Test(expected = TopologyBuilderException.class)
+ @Test
public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
final String sourceNodeName = "source";
final String goodNodeName = "goodGuy";
@@ -276,15 +275,12 @@ public class TopologyTest {
try {
new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+ fail("Should have thrown StreamsException");
} catch (final StreamsException e) {
- final Throwable cause = e.getCause();
- if (cause != null
- && cause instanceof TopologyBuilderException
- && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
- throw (TopologyBuilderException) cause;
- } else {
- throw new RuntimeException("Did expect different exception. Did catch:", e);
- }
+ final String error = e.toString();
+ final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
+
+ assertThat(error, equalTo(expectedMessage));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7a81594..f67b634 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -51,6 +51,7 @@ import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -590,8 +591,7 @@ public class TopologyBuilderTest {
assertEquals("appId-foo", topicConfig.name());
}
-
- @Test(expected = TopologyBuilderException.class)
+ @Test
public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
final String sourceNodeName = "source";
final String goodNodeName = "goodGuy";
@@ -603,27 +603,24 @@ public class TopologyBuilderTest {
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final StreamsConfig streamsConfig = new StreamsConfig(config);
- try {
- final TopologyBuilder builder = new TopologyBuilder();
- builder
- .addSource(sourceNodeName, "topic")
- .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource(sourceNodeName, "topic")
+ .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
+ sourceNodeName)
.addStateStore(
- Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
- goodNodeName)
- .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
+ Stores.create(LocalMockProcessorSupplier.STORE_NAME)
+ .withStringKeys().withStringValues().inMemory()
+ .build(), goodNodeName)
+ .addProcessor(badNodeName, new LocalMockProcessorSupplier(),
+ sourceNodeName);
+ try {
final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
- driver.process("topic", null, null);
+ fail("Should have thrown StreamsException");
} catch (final StreamsException e) {
- final Throwable cause = e.getCause();
- if (cause != null
- && cause instanceof TopologyBuilderException
- && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
- throw (TopologyBuilderException) cause;
- } else {
- throw new RuntimeException("Did expect different exception. Did catch:", e);
- }
+ final String error = e.toString();
+ final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
+
+ assertThat(error, equalTo(expectedMessage));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index a39e545..901fc4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -53,7 +52,9 @@ import java.util.regex.Pattern;
import static org.apache.kafka.common.utils.Utils.mkList;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -578,17 +579,15 @@ public class InternalTopologyBuilderTest {
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
goodNodeName);
builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
+
try {
new ProcessorTopologyTestDriver(streamsConfig, builder);
fail("Should have throw StreamsException");
- } catch (final StreamsException expected) {
- final Throwable cause = expected.getCause();
- if (cause == null
- || !(cause instanceof TopologyBuilderException)
- || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
- throw new RuntimeException("Did expect different exception. Did catch:", expected);
- }
+ } catch (final StreamsException e) {
+ final String error = e.toString();
+ final String expectedMessage = "org.apache.kafka.streams.errors.StreamsException: failed to initialize processor " + badNodeName;
+
+ assertThat(error, equalTo(expectedMessage));
}
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.