You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/07/26 08:08:18 UTC
kafka git commit: MINOR: enforce setting listeners in CREATE state.
Repository: kafka
Updated Branches:
refs/heads/trunk e5e88f636 -> 89faed8d3
MINOR: enforce setting listeners in CREATE state.
Author: Bill Bejeck <bi...@confluent.io>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #3569 from bbejeck/MINOR_enforce_adding_listeners_only_created_state
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89faed8d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89faed8d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89faed8d
Branch: refs/heads/trunk
Commit: 89faed8d30eb441fac7f1564edd40115006f5051
Parents: e5e88f6
Author: Bill Bejeck <bi...@confluent.io>
Authored: Wed Jul 26 09:08:14 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Jul 26 09:08:14 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 26 ++++++++++++++-----
.../apache/kafka/streams/KafkaStreamsTest.java | 27 ++++++++++++++++++++
2 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/89faed8d/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c7c67d5..f06a7e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -74,6 +74,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.KafkaStreams.State.CREATED;
import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
import static org.apache.kafka.streams.KafkaStreams.State.NOT_RUNNING;
import static org.apache.kafka.streams.KafkaStreams.State.PENDING_SHUTDOWN;
@@ -228,10 +229,17 @@ public class KafkaStreams {
/**
* An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes.
+ *
* @param listener a new state listener
*/
public void setStateListener(final KafkaStreams.StateListener listener) {
- stateListener = listener;
+ synchronized (stateLock) {
+ if (state == CREATED) {
+ stateListener = listener;
+ } else {
+ throw new IllegalStateException("Can only set StateListener in CREATED state.");
+ }
+ }
}
/**
@@ -737,12 +745,18 @@ public class KafkaStreams {
* @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler
*/
public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
- for (final StreamThread thread : threads) {
- thread.setUncaughtExceptionHandler(eh);
- }
+ synchronized (stateLock) {
+ if (state == CREATED) {
+ for (final StreamThread thread : threads) {
+ thread.setUncaughtExceptionHandler(eh);
+ }
- if (globalStreamThread != null) {
- globalStreamThread.setUncaughtExceptionHandler(eh);
+ if (globalStreamThread != null) {
+ globalStreamThread.setUncaughtExceptionHandler(eh);
+ }
+ } else {
+ throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/89faed8d/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 467f8b8..e3443fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@Category({IntegrationTest.class})
public class KafkaStreamsTest {
@@ -255,6 +256,32 @@ public class KafkaStreamsTest {
}
@Test
+ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
+ streams.start();
+ try {
+ streams.setUncaughtExceptionHandler(null);
+ fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ Assert.assertEquals("Can only set UncaughtExceptionHandler in CREATED state.", e.getMessage());
+ } finally {
+ streams.close();
+ }
+ }
+
+ @Test
+ public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
+ streams.start();
+ try {
+ streams.setStateListener(null);
+ fail("Should throw IllegalStateException");
+ } catch (final IllegalStateException e) {
+ Assert.assertEquals("Can only set StateListener in CREATED state.", e.getMessage());
+ } finally {
+ streams.close();
+ }
+ }
+
+ @Test
public void testNumberDefaultMetrics() {
final KafkaStreams streams = createKafkaStreams();
final Map<MetricName, ? extends Metric> metrics = streams.metrics();