You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/07 20:13:31 UTC
kafka git commit: HOTFIX: recreate state.dir after cleanup
Repository: kafka
Updated Branches:
refs/heads/trunk 454f6845b -> e876df8b3
HOTFIX: recreate state.dir after cleanup
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #1940 from mjsax/hotfix-resetTool
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e876df8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e876df8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e876df8b
Branch: refs/heads/trunk
Commit: e876df8b37fc6ea54b0a0571306c4a833c919cda
Parents: 454f684
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Oct 7 13:13:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 7 13:13:27 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 86 ++++++++++----------
.../apache/kafka/streams/KafkaStreamsTest.java | 49 ++---------
.../integration/ResetIntegrationTest.java | 2 +-
3 files changed, 50 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e876df8b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d88d09e..2333db7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -25,18 +25,20 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
-import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
-import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
@@ -47,9 +49,6 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
* sends output to zero or more output topics.
@@ -110,7 +109,7 @@ public class KafkaStreams {
// of the co-location of stream thread's consumers. It is for internal
// usage only and should not be exposed to users at all.
private final UUID processId;
- private StreamsMetadataState streamsMetadataState;
+ private final StreamsMetadataState streamsMetadataState;
private final StreamsConfig config;
@@ -146,7 +145,7 @@ public class KafkaStreams {
// create the metrics
final Time time = new SystemTime();
- this.processId = UUID.randomUUID();
+ processId = UUID.randomUUID();
this.config = config;
@@ -167,24 +166,24 @@ public class KafkaStreams {
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
- this.metrics = new Metrics(metricConfig, reporters, time);
+ metrics = new Metrics(metricConfig, reporters, time);
- this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+ threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
streamsMetadataState = new StreamsMetadataState(builder);
- for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder,
- config,
- clientSupplier,
- applicationId,
- clientId,
- processId,
- metrics,
- time,
- streamsMetadataState);
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new StreamThread(builder,
+ config,
+ clientSupplier,
+ applicationId,
+ clientId,
+ processId,
+ metrics,
+ time,
+ streamsMetadataState);
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
- this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
+ queryableStoreProvider = new QueryableStoreProvider(storeProviders);
}
/**
@@ -195,14 +194,14 @@ public class KafkaStreams {
public synchronized void start() {
log.debug("Starting Kafka Stream process");
- if (this.state == CREATED) {
- for (final StreamThread thread : this.threads)
+ if (state == CREATED) {
+ for (final StreamThread thread : threads)
thread.start();
- this.state = RUNNING;
+ state = RUNNING;
log.info("Started Kafka Stream process");
- } else if (this.state == RUNNING) {
+ } else if (state == RUNNING) {
throw new IllegalStateException("This process was already started.");
} else {
throw new IllegalStateException("Cannot restart after closing.");
@@ -218,12 +217,12 @@ public class KafkaStreams {
public synchronized void close() {
log.debug("Stopping Kafka Stream process");
- if (this.state == RUNNING) {
+ if (state == RUNNING) {
// signal the threads to stop and wait
- for (final StreamThread thread : this.threads)
+ for (final StreamThread thread : threads)
thread.close();
- for (final StreamThread thread : this.threads) {
+ for (final StreamThread thread : threads) {
try {
thread.join();
} catch (final InterruptedException ex) {
@@ -232,9 +231,9 @@ public class KafkaStreams {
}
}
- if (this.state != STOPPED) {
- this.metrics.close();
- this.state = STOPPED;
+ if (state != STOPPED) {
+ metrics.close();
+ state = STOPPED;
log.info("Stopped Kafka Stream process");
}
@@ -247,9 +246,9 @@ public class KafkaStreams {
* @return A string representation of the Kafka Streams instance.
*/
public String toString() {
- StringBuilder sb = new StringBuilder("KafkaStreams processID:" + this.processId + "\n");
- for (int i = 0; i < this.threads.length; i++) {
- sb.append("\t" + this.threads[i].toString());
+ final StringBuilder sb = new StringBuilder("KafkaStreams processID:" + processId + "\n");
+ for (final StreamThread thread : threads) {
+ sb.append("\t").append(thread.toString());
}
sb.append("\n");
@@ -264,19 +263,20 @@ public class KafkaStreams {
* @throws IllegalStateException if instance is currently running
*/
public void cleanUp() {
- if (this.state == RUNNING) {
+ if (state == RUNNING) {
throw new IllegalStateException("Cannot clean up while running.");
}
- final String localApplicationDir = this.config.getString(StreamsConfig.STATE_DIR_CONFIG)
- + File.separator
- + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+ final String appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+ final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG);
- log.debug("Clean up local Kafka Streams data in {}", localApplicationDir);
+ final String localApplicationDir = stateDir + File.separator + appId;
log.debug("Removing local Kafka Streams application data in {} for application {}",
localApplicationDir,
- this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
- Utils.delete(new File(localApplicationDir));
+ appId);
+
+ final StateDirectory stateDirectory = new StateDirectory(appId, stateDir);
+ stateDirectory.cleanRemovedTasks();
}
/**
@@ -285,7 +285,7 @@ public class KafkaStreams {
* @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
*/
public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
- for (final StreamThread thread : this.threads)
+ for (final StreamThread thread : threads)
thread.setUncaughtExceptionHandler(eh);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e876df8b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 7330810..35b88db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -22,15 +22,12 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.MockMetricsReporter;
-import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
-import java.io.File;
import java.util.Properties;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class KafkaStreamsTest {
@@ -123,25 +120,25 @@ public class KafkaStreamsTest {
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
- KafkaStreams streams = createKafkaStreams();
+ final KafkaStreams streams = createKafkaStreams();
streams.allMetadata();
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
- KafkaStreams streams = createKafkaStreams();
+ final KafkaStreams streams = createKafkaStreams();
streams.allMetadataForStore("store");
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
- KafkaStreams streams = createKafkaStreams();
+ final KafkaStreams streams = createKafkaStreams();
streams.metadataForKey("store", "key", Serdes.String().serializer());
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
- KafkaStreams streams = createKafkaStreams();
+ final KafkaStreams streams = createKafkaStreams();
streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
@Override
public Integer partition(final String key, final Object value, final int numPartitions) {
@@ -152,11 +149,11 @@ public class KafkaStreamsTest {
private KafkaStreams createKafkaStreams() {
- Properties props = new Properties();
+ final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- KStreamBuilder builder = new KStreamBuilder();
+ final KStreamBuilder builder = new KStreamBuilder();
return new KafkaStreams(builder, props);
}
@@ -175,40 +172,6 @@ public class KafkaStreamsTest {
streams.cleanUp();
}
- @Test
- public void testCleanupIsolation() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
-
- final String appId1 = "testIsolation-1";
- final String appId2 = "testIsolation-2";
- final String stateDir = TestUtils.tempDirectory().getPath();
- final File stateDirApp1 = new File(stateDir + File.separator + appId1);
- final File stateDirApp2 = new File(stateDir + File.separator + appId2);
-
- final Properties props = new Properties();
- props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
-
- assertFalse(stateDirApp1.exists());
- assertFalse(stateDirApp2.exists());
-
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId1);
- final KafkaStreams streams1 = new KafkaStreams(builder, props);
- props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId2);
- final KafkaStreams streams2 = new KafkaStreams(builder, props);
-
- assertTrue(stateDirApp1.exists());
- assertTrue(stateDirApp2.exists());
-
- streams1.cleanUp();
- assertFalse(stateDirApp1.exists());
- assertTrue(stateDirApp2.exists());
-
- streams2.cleanUp();
- assertFalse(stateDirApp1.exists());
- assertFalse(stateDirApp2.exists());
- }
-
@Test(expected = IllegalStateException.class)
public void testCannotCleanupWhileRunning() throws Exception {
final Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e876df8b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index ccafa30..5847fb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -135,6 +135,7 @@ public class ResetIntegrationTest {
"Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
+ streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.cleanUp();
cleanGlobal();
TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
@@ -143,7 +144,6 @@ public class ResetIntegrationTest {
assertInternalTopicsGotDeleted();
// RE-RUN
- streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.start();
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,