You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/07 20:13:31 UTC

kafka git commit: HOTFIX: recreate state.dir after cleanup

Repository: kafka
Updated Branches:
  refs/heads/trunk 454f6845b -> e876df8b3


HOTFIX: recreate state.dir after cleanup

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #1940 from mjsax/hotfix-resetTool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e876df8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e876df8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e876df8b

Branch: refs/heads/trunk
Commit: e876df8b37fc6ea54b0a0571306c4a833c919cda
Parents: 454f684
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Oct 7 13:13:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 7 13:13:27 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 86 ++++++++++----------
 .../apache/kafka/streams/KafkaStreamsTest.java  | 49 ++---------
 .../integration/ResetIntegrationTest.java       |  2 +-
 3 files changed, 50 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e876df8b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d88d09e..2333db7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -25,18 +25,20 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
-import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
-import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -47,9 +49,6 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
  * sends output to zero or more output topics.
@@ -110,7 +109,7 @@ public class KafkaStreams {
     // of the co-location of stream thread's consumers. It is for internal
     // usage only and should not be exposed to users at all.
     private final UUID processId;
-    private StreamsMetadataState streamsMetadataState;
+    private final StreamsMetadataState streamsMetadataState;
 
     private final StreamsConfig config;
 
@@ -146,7 +145,7 @@ public class KafkaStreams {
         // create the metrics
         final Time time = new SystemTime();
 
-        this.processId = UUID.randomUUID();
+        processId = UUID.randomUUID();
 
         this.config = config;
 
@@ -167,24 +166,24 @@ public class KafkaStreams {
             .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                 TimeUnit.MILLISECONDS);
 
-        this.metrics = new Metrics(metricConfig, reporters, time);
+        metrics = new Metrics(metricConfig, reporters, time);
 
-        this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         streamsMetadataState = new StreamsMetadataState(builder);
-        for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder,
-                                               config,
-                                               clientSupplier,
-                                               applicationId,
-                                               clientId,
-                                               processId,
-                                               metrics,
-                                               time,
-                                               streamsMetadataState);
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new StreamThread(builder,
+                config,
+                clientSupplier,
+                applicationId,
+                clientId,
+                processId,
+                metrics,
+                time,
+                streamsMetadataState);
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
-        this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
+        queryableStoreProvider = new QueryableStoreProvider(storeProviders);
     }
 
     /**
@@ -195,14 +194,14 @@ public class KafkaStreams {
     public synchronized void start() {
         log.debug("Starting Kafka Stream process");
 
-        if (this.state == CREATED) {
-            for (final StreamThread thread : this.threads)
+        if (state == CREATED) {
+            for (final StreamThread thread : threads)
                 thread.start();
 
-            this.state = RUNNING;
+            state = RUNNING;
 
             log.info("Started Kafka Stream process");
-        } else if (this.state == RUNNING) {
+        } else if (state == RUNNING) {
             throw new IllegalStateException("This process was already started.");
         } else {
             throw new IllegalStateException("Cannot restart after closing.");
@@ -218,12 +217,12 @@ public class KafkaStreams {
     public synchronized void close() {
         log.debug("Stopping Kafka Stream process");
 
-        if (this.state == RUNNING) {
+        if (state == RUNNING) {
             // signal the threads to stop and wait
-            for (final StreamThread thread : this.threads)
+            for (final StreamThread thread : threads)
                 thread.close();
 
-            for (final StreamThread thread : this.threads) {
+            for (final StreamThread thread : threads) {
                 try {
                     thread.join();
                 } catch (final InterruptedException ex) {
@@ -232,9 +231,9 @@ public class KafkaStreams {
             }
         }
 
-        if (this.state != STOPPED) {
-            this.metrics.close();
-            this.state = STOPPED;
+        if (state != STOPPED) {
+            metrics.close();
+            state = STOPPED;
             log.info("Stopped Kafka Stream process");
         }
 
@@ -247,9 +246,9 @@ public class KafkaStreams {
      * @return A string representation of the Kafka Streams instance.
      */
     public String toString() {
-        StringBuilder sb = new StringBuilder("KafkaStreams processID:" + this.processId + "\n");
-        for (int i = 0; i < this.threads.length; i++) {
-            sb.append("\t" + this.threads[i].toString());
+        final StringBuilder sb = new StringBuilder("KafkaStreams processID:" + processId + "\n");
+        for (final StreamThread thread : threads) {
+            sb.append("\t").append(thread.toString());
         }
         sb.append("\n");
 
@@ -264,19 +263,20 @@ public class KafkaStreams {
      * @throws IllegalStateException if instance is currently running
      */
     public void cleanUp() {
-        if (this.state == RUNNING) {
+        if (state == RUNNING) {
             throw new IllegalStateException("Cannot clean up while running.");
         }
 
-        final String localApplicationDir = this.config.getString(StreamsConfig.STATE_DIR_CONFIG)
-            + File.separator
-            + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+        final String appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+        final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG);
 
-        log.debug("Clean up local Kafka Streams data in {}", localApplicationDir);
+        final String localApplicationDir = stateDir + File.separator + appId;
         log.debug("Removing local Kafka Streams application data in {} for application {}",
             localApplicationDir,
-            this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-        Utils.delete(new File(localApplicationDir));
+            appId);
+
+        final StateDirectory stateDirectory = new StateDirectory(appId, stateDir);
+        stateDirectory.cleanRemovedTasks();
     }
 
     /**
@@ -285,7 +285,7 @@ public class KafkaStreams {
      * @param eh the object to use as this thread's uncaught exception handler. If null then this thread has no explicit handler.
      */
     public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
-        for (final StreamThread thread : this.threads)
+        for (final StreamThread thread : threads)
             thread.setUncaughtExceptionHandler(eh);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e876df8b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 7330810..35b88db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -22,15 +22,12 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
-import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.Properties;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class KafkaStreamsTest {
@@ -123,25 +120,25 @@ public class KafkaStreamsTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
-        KafkaStreams streams = createKafkaStreams();
+        final KafkaStreams streams = createKafkaStreams();
         streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
-        KafkaStreams streams = createKafkaStreams();
+        final KafkaStreams streams = createKafkaStreams();
         streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
-        KafkaStreams streams = createKafkaStreams();
+        final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
-        KafkaStreams streams = createKafkaStreams();
+        final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String key, final Object value, final int numPartitions) {
@@ -152,11 +149,11 @@ public class KafkaStreamsTest {
 
 
     private KafkaStreams createKafkaStreams() {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
-        KStreamBuilder builder = new KStreamBuilder();
+        final KStreamBuilder builder = new KStreamBuilder();
         return new KafkaStreams(builder, props);
     }
 
@@ -175,40 +172,6 @@ public class KafkaStreamsTest {
         streams.cleanUp();
     }
 
-    @Test
-    public void testCleanupIsolation() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        final String appId1 = "testIsolation-1";
-        final String appId2 = "testIsolation-2";
-        final String stateDir = TestUtils.tempDirectory().getPath();
-        final File stateDirApp1 = new File(stateDir + File.separator + appId1);
-        final File stateDirApp2 = new File(stateDir + File.separator + appId2);
-
-        final Properties props = new Properties();
-        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
-
-        assertFalse(stateDirApp1.exists());
-        assertFalse(stateDirApp2.exists());
-
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId1);
-        final KafkaStreams streams1 = new KafkaStreams(builder, props);
-        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId2);
-        final KafkaStreams streams2 = new KafkaStreams(builder, props);
-
-        assertTrue(stateDirApp1.exists());
-        assertTrue(stateDirApp2.exists());
-
-        streams1.cleanUp();
-        assertFalse(stateDirApp1.exists());
-        assertTrue(stateDirApp2.exists());
-
-        streams2.cleanUp();
-        assertFalse(stateDirApp1.exists());
-        assertFalse(stateDirApp2.exists());
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testCannotCleanupWhileRunning() throws Exception {
         final Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e876df8b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index ccafa30..5847fb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -135,6 +135,7 @@ public class ResetIntegrationTest {
             "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
+        streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.cleanUp();
         cleanGlobal();
         TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
@@ -143,7 +144,6 @@ public class ResetIntegrationTest {
         assertInternalTopicsGotDeleted();
 
         // RE-RUN
-        streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.start();
         final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,