You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/13 04:14:55 UTC

kafka git commit: MINOR: Fix bugs in KafkaStreams.close()

Repository: kafka
Updated Branches:
  refs/heads/trunk 81f76bde8 -> f34164eed


MINOR: Fix bugs in KafkaStreams.close()

Initially proposed by ijuma in https://github.com/apache/kafka/pull/1362#issuecomment-218293662

mjsax commented:

> StreamThread.close() should be extended to call metrics.close() (the class need a private member to reference the Metrics object, too)

The `Metrics` instance is created in the `KafkaStreams` constructor and shared between all threads, so closing it within the threads doesn't seem like the right approach. This PR calls `Metrics.close()` in `KafkaStreams.close()` instead.

cc guozhangwang

Author: Jeff Klukas <je...@klukas.net>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1379 from jklukas/close-streams-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f34164ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f34164ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f34164ee

Branch: refs/heads/trunk
Commit: f34164eed53d791768f05df21f4dfeca89859b2e
Parents: 81f76bd
Author: Jeff Klukas <je...@klukas.net>
Authored: Thu May 12 21:14:51 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu May 12 21:14:51 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  14 ++-
 .../apache/kafka/streams/KafkaStreamsTest.java  | 106 +++++++++++++++++++
 2 files changed, 115 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f34164ee/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b3e3f5d..af6d973 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -91,6 +91,7 @@ public class KafkaStreams {
     private int state = CREATED;
 
     private final StreamThread[] threads;
+    private final Metrics metrics;
 
     // processId is expected to be unique across JVMs and to be used
     // in userData of the subscription request to allow assignor be aware
@@ -147,7 +148,7 @@ public class KafkaStreams {
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                 TimeUnit.MILLISECONDS);
 
-        Metrics metrics = new Metrics(metricConfig, reporters, time);
+        this.metrics = new Metrics(metricConfig, reporters, time);
 
         this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         for (int i = 0; i < this.threads.length; i++) {
@@ -169,8 +170,10 @@ public class KafkaStreams {
             state = RUNNING;
 
             log.info("Started Kafka Stream process");
-        } else {
+        } else if (state == RUNNING) {
             throw new IllegalStateException("This process was already started.");
+        } else {
+            throw new IllegalStateException("Cannot restart after closing.");
         }
     }
 
@@ -194,13 +197,14 @@ public class KafkaStreams {
                     Thread.interrupted();
                 }
             }
+        }
 
+        if (state != STOPPED) {
+            metrics.close();
             state = STOPPED;
-
             log.info("Stopped Kafka Stream process");
-        } else {
-            throw new IllegalStateException("This process has not started yet.");
         }
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f34164ee/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
new file mode 100644
index 0000000..22d8bf2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.MockMetricsReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class KafkaStreamsTest {
+
+    @Test
+    public void testStartAndClose() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
+        final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
+
+        KStreamBuilder builder = new KStreamBuilder();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+
+        streams.start();
+        final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
+        final int initCountDifference = newInitCount - oldInitCount;
+        Assert.assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
+
+        streams.close();
+        Assert.assertEquals("each reporter initialized should also be closed",
+                oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
+    }
+
+    @Test
+    public void testCloseIsIdempotent() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        KStreamBuilder builder = new KStreamBuilder();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.close();
+        final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+
+        streams.close();
+        Assert.assertEquals("subsequent close() calls should do nothing",
+                closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+    }
+
+    @Test
+    public void testCannotStartOnceClosed() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        KStreamBuilder builder = new KStreamBuilder();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.close();
+
+        try {
+            streams.start();
+        } catch (IllegalStateException e) {
+            Assert.assertEquals("Cannot restart after closing.", e.getMessage());
+            return;
+        }
+        Assert.fail("should have caught an exception and returned");
+    }
+
+    @Test
+    public void testCannotStartTwice() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        KStreamBuilder builder = new KStreamBuilder();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
+
+        try {
+            streams.start();
+        } catch (IllegalStateException e) {
+            Assert.assertEquals("This process was already started.", e.getMessage());
+            return;
+        }
+        Assert.fail("should have caught an exception and returned");
+    }
+}