You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/13 04:14:55 UTC
kafka git commit: MINOR: Fix bugs in KafkaStreams.close()
Repository: kafka
Updated Branches:
refs/heads/trunk 81f76bde8 -> f34164eed
MINOR: Fix bugs in KafkaStreams.close()
Initially proposed by ijuma in https://github.com/apache/kafka/pull/1362#issuecomment-218293662
mjsax commented:
> StreamThread.close() should be extended to call metrics.close() (the class need a private member to reference the Metrics object, too)
The `Metrics` instance is created in the `KafkaStreams` constructor and shared between all threads, so closing it within the threads doesn't seem like the right approach. This PR calls `Metrics.close()` in `KafkaStreams.close()` instead.
cc guozhangwang
Author: Jeff Klukas <je...@klukas.net>
Reviewers: Ismael Juma, Guozhang Wang
Closes #1379 from jklukas/close-streams-metrics
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f34164ee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f34164ee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f34164ee
Branch: refs/heads/trunk
Commit: f34164eed53d791768f05df21f4dfeca89859b2e
Parents: 81f76bd
Author: Jeff Klukas <je...@klukas.net>
Authored: Thu May 12 21:14:51 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu May 12 21:14:51 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 14 ++-
.../apache/kafka/streams/KafkaStreamsTest.java | 106 +++++++++++++++++++
2 files changed, 115 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f34164ee/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b3e3f5d..af6d973 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -91,6 +91,7 @@ public class KafkaStreams {
private int state = CREATED;
private final StreamThread[] threads;
+ private final Metrics metrics;
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
@@ -147,7 +148,7 @@ public class KafkaStreams {
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
- Metrics metrics = new Metrics(metricConfig, reporters, time);
+ this.metrics = new Metrics(metricConfig, reporters, time);
this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
for (int i = 0; i < this.threads.length; i++) {
@@ -169,8 +170,10 @@ public class KafkaStreams {
state = RUNNING;
log.info("Started Kafka Stream process");
- } else {
+ } else if (state == RUNNING) {
throw new IllegalStateException("This process was already started.");
+ } else {
+ throw new IllegalStateException("Cannot restart after closing.");
}
}
@@ -194,13 +197,14 @@ public class KafkaStreams {
Thread.interrupted();
}
}
+ }
+ if (state != STOPPED) {
+ metrics.close();
state = STOPPED;
-
log.info("Stopped Kafka Stream process");
- } else {
- throw new IllegalStateException("This process has not started yet.");
}
+
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f34164ee/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
new file mode 100644
index 0000000..22d8bf2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.MockMetricsReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class KafkaStreamsTest {
+
+ @Test
+ public void testStartAndClose() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+ final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
+ final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
+
+ KStreamBuilder builder = new KStreamBuilder();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+
+ streams.start();
+ final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
+ final int initCountDifference = newInitCount - oldInitCount;
+ Assert.assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
+
+ streams.close();
+ Assert.assertEquals("each reporter initialized should also be closed",
+ oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
+ }
+
+ @Test
+ public void testCloseIsIdempotent() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+ KStreamBuilder builder = new KStreamBuilder();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.close();
+ final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+
+ streams.close();
+ Assert.assertEquals("subsequent close() calls should do nothing",
+ closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+ }
+
+ @Test
+ public void testCannotStartOnceClosed() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ KStreamBuilder builder = new KStreamBuilder();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.close();
+
+ try {
+ streams.start();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals("Cannot restart after closing.", e.getMessage());
+ return;
+ }
+ Assert.fail("should have caught an exception and returned");
+ }
+
+ @Test
+ public void testCannotStartTwice() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ KStreamBuilder builder = new KStreamBuilder();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
+
+ try {
+ streams.start();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals("This process was already started.", e.getMessage());
+ return;
+ }
+ Assert.fail("should have caught an exception and returned");
+ }
+}