You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/04/05 01:40:59 UTC

kafka git commit: KAFKA-4916: test streams with brokers failing

Repository: kafka
Updated Branches:
  refs/heads/trunk 916081007 -> 49f80b236


KAFKA-4916: test streams with brokers failing

Several fixes for handling broker failures:
- default replication value for internal topics is now 3 in test itself (not in streams code, that will require a KIP.
- streams producer waits for acks from all replicas in test itself (not in streams code, that will require a KIP.
- backoff time for streams client to try again after a failure to contact controller.
- fix bug related to state store locks (this helps in multi-threaded scenarios)
- fix related to catching exceptions property for network errors.
- system test for all the above

Author: Eno Thereska <en...@confluent.io>
Author: Eno Thereska <en...@gmail.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Dan Norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2719 from enothereska/KAFKA-4916-broker-bounce-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49f80b23
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49f80b23
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49f80b23

Branch: refs/heads/trunk
Commit: 49f80b2360ab55274ad28405352596f9d560ec1c
Parents: 9160810
Author: Eno Thereska <en...@confluent.io>
Authored: Tue Apr 4 18:32:58 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Apr 4 18:32:58 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |   2 +-
 .../internals/InternalTopicManager.java         |  32 ++-
 .../internals/StreamPartitionAssignor.java      |  13 +-
 .../processor/internals/StreamThread.java       |  53 +++--
 .../processor/internals/StreamsKafkaClient.java |  20 +-
 .../internals/InternalTopicManagerTest.java     |  15 +-
 .../kafka/streams/tests/SmokeTestClient.java    |  29 ++-
 .../kafka/streams/tests/SmokeTestDriver.java    |  20 +-
 .../kafka/test/MockInternalTopicManager.java    |   5 +-
 .../tests/streams/streams_bounce_test.py        |   4 +-
 .../tests/streams/streams_broker_bounce_test.py | 213 +++++++++++++++++++
 11 files changed, 354 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 94719c6..52721ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -241,7 +241,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(REPLICATION_FACTOR_CONFIG,
                     Type.INT,
                     1,
-                    Importance.MEDIUM,
+                    Importance.HIGH,
                     REPLICATION_FACTOR_DOC)
             .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                     Type.CLASS,

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index d8575e9..7dab99d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,16 +36,18 @@ public class InternalTopicManager {
     public static final String RETENTION_MS = "retention.ms";
     public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
     private static final int MAX_TOPIC_READY_TRY = 5;
-
+    private final Time time;
     private final long windowChangeLogAdditionalRetention;
 
     private final int replicationFactor;
     private final StreamsKafkaClient streamsKafkaClient;
 
-    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) {
+    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor,
+                                final long windowChangeLogAdditionalRetention, final Time time) {
         this.streamsKafkaClient = streamsKafkaClient;
         this.replicationFactor = replicationFactor;
         this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
+        this.time = time;
     }
 
     /**
@@ -60,11 +63,19 @@ public class InternalTopicManager {
                 final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
                 final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
                 final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
+                if (metadata.brokers().size() < replicationFactor) {
+                    throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " +
+                        " but replication factor is " + replicationFactor + "." +
+                        " Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\""  +
+                        " or add more brokers to your cluster.");
+                }
                 streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata);
                 return;
             } catch (StreamsException ex) {
                 log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
             }
+            // backoff
+            time.sleep(100L);
         }
         throw new StreamsException("Could not create internal topics.");
     }
@@ -73,11 +84,20 @@ public class InternalTopicManager {
      * Get the number of partitions for the given topics
      */
     public Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
-        final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
-        existingTopicPartitions.keySet().retainAll(topics);
+        for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
+            try {
+                final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
+                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
+                existingTopicPartitions.keySet().retainAll(topics);
 
-        return existingTopicPartitions;
+                return existingTopicPartitions;
+            } catch (StreamsException ex) {
+                log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i);
+            }
+            // backoff
+            time.sleep(100L);
+        }
+        throw new StreamsException("Could not get number of partitions.");
     }
 
     public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 24e6709..004926f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -57,7 +58,7 @@ import static org.apache.kafka.streams.processor.internals.InternalTopicManager.
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
-
+    private Time time = Time.SYSTEM;
     private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
 
@@ -159,6 +160,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private CopartitionedTopicsValidator copartitionedTopicsValidator;
 
     /**
+     * Package-private method to set the time. Used for tests.
+     * @param time Time to be used.
+     */
+    void time(final Time time) {
+        this.time = time;
+    }
+
+    /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
      * since the former needs later's cached metadata while sending subscriptions,
      * and the latter needs former's returned assignment when adding tasks.
@@ -207,7 +216,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
                 configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ?
                         (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
-                        : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+                        : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
 
         this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9791a0a..8bd6d1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -401,17 +401,21 @@ public class StreamThread extends Thread {
 
     @SuppressWarnings("ThrowableNotThrown")
     private void shutdownTasksAndState() {
-        log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix,
-            activeTasks.keySet(), standbyTasks.keySet());
+        log.debug("{} shutdownTasksAndState: shutting down" +
+                "active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}",
+            logPrefix, activeTasks.keySet(), standbyTasks.keySet(),
+            suspendedTasks.keySet(), suspendedStandbyTasks.keySet());
 
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         // Close all processors in topology order
-        firstException.compareAndSet(null, closeAllTasks());
+        firstException.compareAndSet(null, closeTasks(activeAndStandbytasks()));
+        firstException.compareAndSet(null, closeTasks(suspendedAndSuspendedStandbytasks()));
         // flush state
         firstException.compareAndSet(null, flushAllState());
         // Close all task state managers. Don't need to set exception as all
         // state would have been flushed above
-        closeAllStateManagers(firstException.get() == null);
+        closeStateManagers(activeAndStandbytasks(), firstException.get() == null);
+        closeStateManagers(suspendedAndSuspendedStandbytasks(), firstException.get() == null);
         // only commit under clean exit
         if (cleanRun && firstException.get() == null) {
             firstException.set(commitOffsets());
@@ -430,7 +434,7 @@ public class StreamThread extends Thread {
             activeTasks.keySet(), standbyTasks.keySet());
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         // Close all topology nodes
-        firstException.compareAndSet(null, closeAllTasksTopologies());
+        firstException.compareAndSet(null, closeActiveAndStandbyTasksTopologies());
         // flush state
         firstException.compareAndSet(null, flushAllState());
         // only commit after all state has been flushed and there hasn't been an exception
@@ -453,12 +457,11 @@ public class StreamThread extends Thread {
         void apply(final AbstractTask task);
     }
 
-    private RuntimeException performOnAllTasks(final AbstractTaskAction action,
-                                   final String exceptionMessage) {
+    private RuntimeException performOnTasks(final List<AbstractTask> tasks,
+                                            final AbstractTaskAction action,
+                                            final String exceptionMessage) {
         RuntimeException firstException = null;
-        final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values());
-        allTasks.addAll(standbyTasks.values());
-        for (final AbstractTask task : allTasks) {
+        for (final AbstractTask task : tasks) {
             try {
                 action.apply(task);
             } catch (RuntimeException t) {
@@ -476,8 +479,20 @@ public class StreamThread extends Thread {
         return firstException;
     }
 
-    private Throwable closeAllStateManagers(final boolean writeCheckpoint) {
-        return performOnAllTasks(new AbstractTaskAction() {
+    private List<AbstractTask> activeAndStandbytasks() {
+        final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values());
+        tasks.addAll(standbyTasks.values());
+        return tasks;
+    }
+
+    private List<AbstractTask> suspendedAndSuspendedStandbytasks() {
+        final List<AbstractTask> tasks = new ArrayList<AbstractTask>(suspendedTasks.values());
+        tasks.addAll(suspendedStandbyTasks.values());
+        return tasks;
+    }
+
+    private Throwable closeStateManagers(final List<AbstractTask> tasks, final boolean writeCheckpoint) {
+        return performOnTasks(tasks, new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id());
@@ -488,7 +503,7 @@ public class StreamThread extends Thread {
 
     private RuntimeException commitOffsets() {
         // Exceptions should not prevent this call from going through all shutdown steps
-        return performOnAllTasks(new AbstractTaskAction() {
+        return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, task.id());
@@ -498,7 +513,7 @@ public class StreamThread extends Thread {
     }
 
     private RuntimeException flushAllState() {
-        return performOnAllTasks(new AbstractTaskAction() {
+        return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, task.id());
@@ -579,6 +594,7 @@ public class StreamThread extends Thread {
         }
     }
 
+
     /**
      * Schedule the records processing by selecting which record is processed next. Commits may
      * happen as records are processed.
@@ -1072,8 +1088,8 @@ public class StreamThread extends Thread {
         standbyRecords.clear();
     }
 
-    private RuntimeException closeAllTasks() {
-        return performOnAllTasks(new AbstractTaskAction() {
+    private RuntimeException closeTasks(final List<AbstractTask> tasks) {
+        return performOnTasks(tasks, new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id());
@@ -1083,8 +1099,9 @@ public class StreamThread extends Thread {
         }, "close");
     }
 
-    private RuntimeException closeAllTasksTopologies() {
-        return performOnAllTasks(new AbstractTaskAction() {
+
+    private RuntimeException closeActiveAndStandbyTasksTopologies() {
+        return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() {
             @Override
             public void apply(final AbstractTask task) {
                 log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id());

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 3217e46..ca3ab1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -205,7 +205,11 @@ public class StreamsKafkaClient {
                     break;
                 }
             }
-            kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+            try {
+                kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+            } catch (final Exception e) {
+                throw new StreamsException("Could not poll.", e);
+            }
         }
         if (brokerId == null) {
             throw new StreamsException("Could not find any available broker.");
@@ -237,11 +241,20 @@ public class StreamsKafkaClient {
     }
 
     private ClientResponse sendRequest(final ClientRequest clientRequest) {
-        kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
+        try {
+            kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
+        } catch (final Exception e) {
+            throw new StreamsException("Could not send request.", e);
+        }
         final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
         // Poll for the response.
         while (Time.SYSTEM.milliseconds() < responseTimeout) {
-            List<ClientResponse> responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+            final List<ClientResponse> responseList;
+            try {
+                responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+            } catch (final IllegalStateException e) {
+                throw new StreamsException("Could not poll.", e);
+            }
             if (!responseList.isEmpty()) {
                 if (responseList.size() > 1) {
                     throw new StreamsException("Sent one request but received multiple or no responses.");
@@ -271,6 +284,7 @@ public class StreamsKafkaClient {
             Time.SYSTEM.milliseconds(),
             true);
         final ClientResponse clientResponse = sendRequest(clientRequest);
+
         if (!clientResponse.hasResponse()) {
             throw new StreamsException("Empty response for client request.");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 84f0e8a..031c732 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -40,7 +42,7 @@ public class InternalTopicManagerTest {
     private final String topic = "test_topic";
     private final String userEndPoint = "localhost:2171";
     private MockStreamKafkaClient streamsKafkaClient;
-
+    private final Time time = new MockTime();
     @Before
     public void init() {
         final StreamsConfig config = new StreamsConfig(configProps());
@@ -54,19 +56,22 @@ public class InternalTopicManagerTest {
 
     @Test
     public void shouldReturnCorrectPartitionCounts() throws Exception {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
     }
 
     @Test
     public void shouldCreateRequiredTopics() throws Exception {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
     }
 
     @Test
     public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         boolean exceptionWasThrown = false;
         try {
             internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2));
@@ -104,7 +109,7 @@ public class InternalTopicManagerTest {
             Node node = new Node(1, "host1", 1001);
             MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>());
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
-            MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID,
+            MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
                 Collections.singletonList(topicMetadata));
             return response;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 7691948..263474c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -39,6 +40,7 @@ public class SmokeTestClient extends SmokeTestUtil {
     private final File stateDir;
     private KafkaStreams streams;
     private Thread thread;
+    private boolean uncaughtException = false;
 
     public SmokeTestClient(File stateDir, String kafka) {
         super();
@@ -51,10 +53,19 @@ public class SmokeTestClient extends SmokeTestUtil {
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
+                System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
+                uncaughtException = true;
                 e.printStackTrace();
             }
         });
 
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                close();
+            }
+        }));
+
         thread = new Thread() {
             public void run() {
                 streams.start();
@@ -64,32 +75,38 @@ public class SmokeTestClient extends SmokeTestUtil {
     }
 
     public void close() {
-        streams.close();
+        streams.close(5, TimeUnit.SECONDS);
+        // do not remove these printouts since they are needed for health scripts
+        if (!uncaughtException) {
+            System.out.println("SMOKE-TEST-CLIENT-CLOSED");
+        }
         try {
             thread.join();
         } catch (Exception ex) {
+            // do not remove these printouts since they are needed for health scripts
+            System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
             // ignore
         }
     }
 
     private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
-        Properties props = new Properties();
+        final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
         props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
 
-        KStreamBuilder builder = new KStreamBuilder();
 
+        KStreamBuilder builder = new KStreamBuilder();
         KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
-
         source.to(stringSerde, intSerde, "echo");
-
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
             @Override
             public boolean test(String key, Integer value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index b3520fb..11e1ae8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -131,13 +131,17 @@ public class SmokeTestDriver extends SmokeTestUtil {
     }
 
     public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
-        Properties props = new Properties();
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        // the next 4 config values make sure that all records are produced with no loss and
+        // no duplicates
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
 
         int numRecordsProduced = 0;
 
@@ -232,7 +236,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
         int retry = 0;
         final long start = System.currentTimeMillis();
-        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(3)) {
+        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
             ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
index 58c6c6d..d6ada4b 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.test;
 
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopicManager;
@@ -29,13 +30,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+
+
 public class MockInternalTopicManager extends InternalTopicManager {
 
     public Map<String, Integer> readyTopics = new HashMap<>();
     private MockConsumer<byte[], byte[]> restoreConsumer;
 
     public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[], byte[]> restoreConsumer) {
-        super(new StreamsKafkaClient(streamsConfig), 0, 0);
+        super(new StreamsKafkaClient(streamsConfig), 0, 0, new MockTime());
 
         this.restoreConsumer = restoreConsumer;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/tests/kafkatest/tests/streams/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py
index 169bbc1..7ac7939 100644
--- a/tests/kafkatest/tests/streams/streams_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_bounce_test.py
@@ -26,7 +26,7 @@ class StreamsBounceTest(KafkaTest):
     """
 
     def __init__(self, test_context):
-        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
             'echo' : { 'partitions': 5, 'replication-factor': 2 },
             'data' : { 'partitions': 5, 'replication-factor': 2 },
             'min' : { 'partitions': 5, 'replication-factor': 2 },
@@ -42,7 +42,7 @@ class StreamsBounceTest(KafkaTest):
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @cluster(num_nodes=5)
+    @cluster(num_nodes=6)
     def test_bounce(self):
         """
         Start a smoke test client, then abort (kill -9) and restart it a few times.

http://git-wip-us.apache.org/repos/asf/kafka/blob/49f80b23/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
new file mode 100644
index 0000000..86c19f9
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+from ducktape.mark import matrix
+from ducktape.mark import parametrize, ignore
+from kafkatest.services.kafka import KafkaService
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+import time
+import signal
+from random import randint
+
+def broker_node(test, topic, broker_type):
+    """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
+    """
+    if broker_type == "leader":
+        node = test.kafka.leader(topic, partition=0)
+    elif broker_type == "controller":
+        node = test.kafka.controller()
+    else:
+        raise Exception("Unexpected broker type %s." % (broker_type))
+
+    return node
+
+def signal_node(test, node, sig):
+    test.kafka.signal_node(node, sig)
+    
+def clean_shutdown(test, topic, broker_type):
+    """Discover broker node of requested type and shut it down cleanly.
+    """
+    node = broker_node(test, topic, broker_type)
+    signal_node(test, node, signal.SIGTERM)
+
+def hard_shutdown(test, topic, broker_type):
+    """Discover broker node of requested type and shut it down with a hard kill."""
+    node = broker_node(test, topic, broker_type)
+    signal_node(test, node, signal.SIGKILL)
+
+    
+failures = {
+    "clean_shutdown": clean_shutdown,
+    "hard_shutdown": hard_shutdown
+}
+        
+class StreamsBrokerBounceTest(Test):
+    """
+    Simple test of Kafka Streams with brokers failing
+    """
+
+    def __init__(self, test_context):
+        super(StreamsBrokerBounceTest, self).__init__(test_context)
+        self.replication = 3
+        self.partitions = 3
+        self.topics = {
+            'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2}},
+            'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2} },
+            'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                      'configs': {"min.insync.replicas": 2} },
+            'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2} },
+            'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2} }
+        }
+
+    def fail_broker_type(self, failure_mode, broker_type):
+        # Pick a random topic and bounce it's leader
+        topic_index = randint(0, len(self.topics.keys()) - 1)
+        topic = self.topics.keys()[topic_index]
+        failures[failure_mode](self, topic, broker_type)
+
+    def fail_many_brokers(self, failure_mode, num_failures):
+        sig = signal.SIGTERM
+        if (failure_mode == "clean_shutdown"):
+            sig = signal.SIGTERM
+        else:
+            sig = signal.SIGKILL
+            
+        for num in range(0, num_failures - 1):
+            signal_node(self, self.kafka.nodes[num], sig)
+
+        
+    def setup_system(self):
+         # Setup phase
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+        
+        self.kafka = KafkaService(self.test_context, num_nodes=self.replication,
+                                  zk=self.zk, topics=self.topics)
+        self.kafka.start()
+        # Start test harness
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
+
+        
+        self.driver.start()
+        self.processor1.start()
+
+    def collect_results(self, sleep_time_secs):
+        data = {}
+        # End test
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor1.stop()
+
+        node = self.driver.node
+        
+        # Success is declared if streams does not crash when sleep time > 0
+        # It should give an exception when sleep time is 0 since we kill the brokers immediately
+        # and the topic manager cannot create internal topics with the desired replication factor
+        if (sleep_time_secs == 0):
+            output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-EXCEPTION %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+        else:
+            output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
+            
+        for line in output_streams:
+            data["Client closed"] = line
+
+        # Currently it is hard to guarantee anything about Kafka since we don't have exactly once.
+        # With exactly once in place, success will be defined as ALL-RECORDS-DELIEVERD and SUCCESS
+        output = node.account.ssh_capture("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED|PROCESSED-LESS-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
+        for line in output:
+            data["Records Delivered"] = line
+        output = node.account.ssh_capture("grep -E 'SUCCESS|FAILURE' %s" % self.driver.STDOUT_FILE, allow_fail=False)
+        for line in output:
+            data["Logic Success/Failure"] = line
+            
+        
+        return data
+
+    @cluster(num_nodes=7)
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
+            broker_type=["leader", "controller"],
+            sleep_time_secs=[120])
+    def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs):
+        """
+        Start a smoke test client, then kill one particular broker and ensure data is still received
+        Record if records are delivered. 
+        """
+        self.setup_system() 
+
+        # Sleep to allow test to run for a bit
+        time.sleep(sleep_time_secs)
+
+        # Fail brokers
+        self.fail_broker_type(failure_mode, broker_type);
+
+        return self.collect_results(sleep_time_secs)
+
+    @cluster(num_nodes=7)
+    @matrix(failure_mode=["clean_shutdown"],
+            broker_type=["controller"],
+            sleep_time_secs=[0])
+    def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs):
+        """
+        Start a smoke test client, then kill one particular broker immediately before streams stats
+        Streams should throw an exception since it cannot create topics with the desired
+        replication factor of 3
+        """
+        self.setup_system() 
+
+        # Sleep to allow test to run for a bit
+        time.sleep(sleep_time_secs)
+
+        # Fail brokers
+        self.fail_broker_type(failure_mode, broker_type);
+
+        return self.collect_results(sleep_time_secs)
+    
+    @cluster(num_nodes=7)
+    @matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
+            num_failures=[2])
+    def test_many_brokers_bounce(self, failure_mode, num_failures):
+        """
+        Start a smoke test client, then kill a few brokers and ensure data is still received
+        Record if records are delivered
+        """
+        self.setup_system() 
+
+        # Sleep to allow test to run for a bit
+        time.sleep(120)
+
+        # Fail brokers
+        self.fail_many_brokers(failure_mode, num_failures);
+
+        return self.collect_results(120)