You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/07/26 08:08:18 UTC

kafka git commit: MINOR: enforce setting listeners in CREATE state.

Repository: kafka
Updated Branches:
  refs/heads/trunk e5e88f636 -> 89faed8d3


MINOR: enforce setting listeners in CREATE state.

Author: Bill Bejeck <bi...@confluent.io>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #3569 from bbejeck/MINOR_enforce_adding_listeners_only_created_state


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/89faed8d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/89faed8d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/89faed8d

Branch: refs/heads/trunk
Commit: 89faed8d30eb441fac7f1564edd40115006f5051
Parents: e5e88f6
Author: Bill Bejeck <bi...@confluent.io>
Authored: Wed Jul 26 09:08:14 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Jul 26 09:08:14 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 26 ++++++++++++++-----
 .../apache/kafka/streams/KafkaStreamsTest.java  | 27 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/89faed8d/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c7c67d5..f06a7e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -74,6 +74,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.KafkaStreams.State.CREATED;
 import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
 import static org.apache.kafka.streams.KafkaStreams.State.NOT_RUNNING;
 import static org.apache.kafka.streams.KafkaStreams.State.PENDING_SHUTDOWN;
@@ -228,10 +229,17 @@ public class KafkaStreams {
 
     /**
      * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes.
+     *
      * @param listener a new state listener
      */
     public void setStateListener(final KafkaStreams.StateListener listener) {
-        stateListener = listener;
+        synchronized (stateLock) {
+            if (state == CREATED) {
+                stateListener = listener;
+            } else {
+                throw new IllegalStateException("Can only set StateListener in CREATED state.");
+            }
+        }
     }
 
     /**
@@ -737,12 +745,18 @@ public class KafkaStreams {
      * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler
      */
     public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
-        for (final StreamThread thread : threads) {
-            thread.setUncaughtExceptionHandler(eh);
-        }
+        synchronized (stateLock) {
+            if (state == CREATED) {
+                for (final StreamThread thread : threads) {
+                    thread.setUncaughtExceptionHandler(eh);
+                }
 
-        if (globalStreamThread != null) {
-            globalStreamThread.setUncaughtExceptionHandler(eh);
+                if (globalStreamThread != null) {
+                    globalStreamThread.setUncaughtExceptionHandler(eh);
+                }
+            } else {
+                throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state.");
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/89faed8d/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 467f8b8..e3443fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
 public class KafkaStreamsTest {
@@ -255,6 +256,32 @@ public class KafkaStreamsTest {
     }
 
     @Test
+    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
+        streams.start();
+        try {
+            streams.setUncaughtExceptionHandler(null);
+            fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            Assert.assertEquals("Can only set UncaughtExceptionHandler in CREATED state.", e.getMessage());
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
+    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
+        streams.start();
+        try {
+            streams.setStateListener(null);
+            fail("Should throw IllegalStateException");
+        } catch (final IllegalStateException e) {
+            Assert.assertEquals("Can only set StateListener in CREATED state.", e.getMessage());
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
     public void testNumberDefaultMetrics() {
         final KafkaStreams streams = createKafkaStreams();
         final Map<MetricName, ? extends Metric> metrics = streams.metrics();