You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/30 20:13:28 UTC

kafka git commit: MINOR: Code cleanup and JavaDoc improvements for clients and Streams

Repository: kafka
Updated Branches:
  refs/heads/trunk 7fe88e8bd -> c7ab3efcb


MINOR: Code cleanup and JavaDoc improvements for clients and Streams

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck <bi...@confluent.io>, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4128 from mjsax/minor-cleanup

minor fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c7ab3efc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c7ab3efc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c7ab3efc

Branch: refs/heads/trunk
Commit: c7ab3efcbe5d34c28e19a5a6a59962c2abfd2235
Parents: 7fe88e8
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Oct 30 13:11:18 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 30 13:13:19 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   |  14 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  13 +-
 .../kafka/clients/producer/KafkaProducer.java   |   9 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   8 +-
 .../internals/GlobalStateManagerImpl.java       |  22 +--
 .../processor/internals/GlobalStreamThread.java |  30 ++--
 .../internals/InternalTopicManager.java         |   7 +-
 .../internals/StoreChangelogReader.java         |  37 ++---
 .../processor/internals/StreamsKafkaClient.java |  86 ++++++++----
 .../processor/internals/TaskManager.java        |   5 +-
 .../apache/kafka/streams/state/HostInfo.java    |  20 +--
 .../internals/InternalTopicManagerTest.java     | 136 ++++++++++++++-----
 .../internals/StoreChangelogReaderTest.java     |  12 +-
 .../internals/StreamsKafkaClientTest.java       |   6 +-
 .../processor/internals/TaskManagerTest.java    |   8 +-
 .../tests/streams/streams_broker_bounce_test.py |   8 +-
 16 files changed, 263 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 74ecc81..1945f50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.clients.admin;
 
-import java.util.Set;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
@@ -65,13 +64,13 @@ import org.apache.kafka.common.requests.AlterConfigsRequest;
 import org.apache.kafka.common.requests.AlterConfigsResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
-import org.apache.kafka.common.requests.CreatePartitionsRequest;
-import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -108,6 +107,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -953,22 +953,22 @@ public class KafkaAdminClient extends AdminClient {
 
         @Override
         public void run() {
-            /**
+            /*
              * Maps nodes to calls that we want to send.
              */
             Map<Node, List<Call>> callsToSend = new HashMap<>();
 
-            /**
+            /*
              * Maps node ID strings to calls that have been sent.
              */
             Map<String, List<Call>> callsInFlight = new HashMap<>();
 
-            /**
+            /*
              * Maps correlation IDs to calls that have been sent.
              */
             Map<Integer, Call> correlationIdToCalls = new HashMap<>();
 
-            /**
+            /*
              * The previous metadata version which wasn't usable, or null if there is none.
              */
             Integer prevMetadataVersion = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c676863..05bca22 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1388,6 +1388,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+     * This method may issue a remote call to the server if there is no current position for the given partition.
+     * <p>
+     * This call will block until either the position could be determined or an unrecoverable error is
+     * encountered (in which case it is thrown to the caller).
      *
      * @param partition The partition to get the position for
      * @return The offset
@@ -1583,6 +1587,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
      *         such message.
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative.
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
      *         expiration of the configured request timeout
@@ -1617,6 +1622,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param partitions the partitions to get the earliest offsets.
      * @return The earliest available offsets for the given partitions
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
      *         expiration of the configured request timeout
      */
@@ -1646,6 +1652,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param partitions the partitions to get the end offsets.
      * @return The end offsets for the given partitions.
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
      *         expiration of the configured request timeout
      */
@@ -1666,9 +1673,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()}
      * cannot be used to interrupt close.
      *
-     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
      *             before or while this function is called
+     * @throws org.apache.kafka.common.KafkaException for any other error during close
      */
     @Override
     public void close() {
@@ -1686,9 +1693,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
      *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
      * @param timeUnit The time unit for the {@code timeout}
-     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
-     * @throws InterruptException If the thread is interrupted before or while this function is called
      * @throws IllegalArgumentException If the {@code timeout} is negative.
+     * @throws InterruptException If the thread is interrupted before or while this function is called
+     * @throws org.apache.kafka.common.KafkaException for any other error during close
      */
     public void close(long timeout, TimeUnit timeUnit) {
         if (timeout < 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3185786..8004180 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -745,7 +747,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
      *
-     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws AuthenticationException if authentication fails. See the exception for more details
+     * @throws AuthorizationException fatal error indicating that the producer is not allowed to write
      * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid objects given the configured serializers
@@ -968,8 +971,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     /**
      * Get the partition metadata for the given topic. This can be used for custom partitioning.
-     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws AuthenticationException if authentication fails. See the exception for more details
+     * @throws AuthorizationException if not authorized to the specified topic. See the exception for more details
      * @throws InterruptException If the thread is interrupted while blocked
+     * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6e48f19..9ad02ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -61,7 +61,6 @@ import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -711,12 +710,7 @@ public class KafkaStreams {
 
         client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));
 
-        try {
-            client.close();
-        } catch (final IOException e) {
-            log.warn("Could not close StreamKafkaClient.", e);
-        }
-
+        client.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 10a0775..53dd51c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -55,7 +55,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private static final Logger log = LoggerFactory.getLogger(GlobalStateManagerImpl.class);
 
     private final ProcessorTopology topology;
-    private final Consumer<byte[], byte[]> consumer;
+    private final Consumer<byte[], byte[]> globalConsumer;
     private final StateDirectory stateDirectory;
     private final Map<String, StateStore> stores = new LinkedHashMap<>();
     private final File baseDir;
@@ -65,11 +65,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private final StateRestoreListener stateRestoreListener;
 
     public GlobalStateManagerImpl(final ProcessorTopology topology,
-                                  final Consumer<byte[], byte[]> consumer,
+                                  final Consumer<byte[], byte[]> globalConsumer,
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener stateRestoreListener) {
         this.topology = topology;
-        this.consumer = consumer;
+        this.globalConsumer = globalConsumer;
         this.stateDirectory = stateDirectory;
         this.baseDir = stateDirectory.globalStateDir();
         this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
@@ -136,19 +136,19 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
 
         log.info("Restoring state for global store {}", store.name());
         final List<TopicPartition> topicPartitions = topicPartitionsForStore(store);
-        final Map<TopicPartition, Long> highWatermarks = consumer.endOffsets(topicPartitions);
+        final Map<TopicPartition, Long> highWatermarks = globalConsumer.endOffsets(topicPartitions);
         try {
             restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name());
             stores.put(store.name(), store);
         } finally {
-            consumer.assign(Collections.<TopicPartition>emptyList());
+            globalConsumer.unsubscribe();
         }
 
     }
 
     private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
         final String sourceTopic = topology.storeToChangelogTopic().get(store.name());
-        final List<PartitionInfo> partitionInfos = consumer.partitionsFor(sourceTopic);
+        final List<PartitionInfo> partitionInfos = globalConsumer.partitionsFor(sourceTopic);
         if (partitionInfos == null || partitionInfos.isEmpty()) {
             throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
         }
@@ -165,15 +165,15 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                               final Map<TopicPartition, Long> highWatermarks,
                               final String storeName) {
         for (final TopicPartition topicPartition : topicPartitions) {
-            consumer.assign(Collections.singletonList(topicPartition));
+            globalConsumer.assign(Collections.singletonList(topicPartition));
             final Long checkpoint = checkpointableOffsets.get(topicPartition);
             if (checkpoint != null) {
-                consumer.seek(topicPartition, checkpoint);
+                globalConsumer.seek(topicPartition, checkpoint);
             } else {
-                consumer.seekToBeginning(Collections.singletonList(topicPartition));
+                globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
             }
 
-            long offset = consumer.position(topicPartition);
+            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             BatchingStateRestoreCallback
                 stateRestoreAdapter =
@@ -186,7 +186,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
-                final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+                final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
                 final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
                 for (ConsumerRecord<byte[], byte[]> record : records) {
                     offset = record.offset() + 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 1ee49e1..f3800fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -50,7 +50,7 @@ public class GlobalStreamThread extends Thread {
     private final Logger log;
     private final LogContext logContext;
     private final StreamsConfig config;
-    private final Consumer<byte[], byte[]> consumer;
+    private final Consumer<byte[], byte[]> globalConsumer;
     private final StateDirectory stateDirectory;
     private final Time time;
     private final ThreadCache cache;
@@ -183,7 +183,7 @@ public class GlobalStreamThread extends Thread {
         this.time = time;
         this.config = config;
         this.topology = topology;
-        this.consumer = globalConsumer;
+        this.globalConsumer = globalConsumer;
         this.stateDirectory = stateDirectory;
         long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
                 (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
@@ -197,7 +197,7 @@ public class GlobalStreamThread extends Thread {
     }
 
     static class StateConsumer {
-        private final Consumer<byte[], byte[]> consumer;
+        private final Consumer<byte[], byte[]> globalConsumer;
         private final GlobalStateMaintainer stateMaintainer;
         private final Time time;
         private final long pollMs;
@@ -207,13 +207,13 @@ public class GlobalStreamThread extends Thread {
         private long lastFlush;
 
         StateConsumer(final LogContext logContext,
-                      final Consumer<byte[], byte[]> consumer,
+                      final Consumer<byte[], byte[]> globalConsumer,
                       final GlobalStateMaintainer stateMaintainer,
                       final Time time,
                       final long pollMs,
                       final long flushInterval) {
             this.log = logContext.logger(getClass());
-            this.consumer = consumer;
+            this.globalConsumer = globalConsumer;
             this.stateMaintainer = stateMaintainer;
             this.time = time;
             this.pollMs = pollMs;
@@ -226,15 +226,15 @@ public class GlobalStreamThread extends Thread {
          */
         void initialize() {
             final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize();
-            consumer.assign(partitionOffsets.keySet());
+            globalConsumer.assign(partitionOffsets.keySet());
             for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
-                consumer.seek(entry.getKey(), entry.getValue());
+                globalConsumer.seek(entry.getKey(), entry.getValue());
             }
             lastFlush = time.milliseconds();
         }
 
         void pollAndUpdate() {
-            final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs);
+            final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
             for (ConsumerRecord<byte[], byte[]> record : received) {
                 stateMaintainer.update(record);
             }
@@ -247,8 +247,8 @@ public class GlobalStreamThread extends Thread {
 
         public void close() throws IOException {
             try {
-                consumer.close();
-            } catch (Exception e) {
+                globalConsumer.close();
+            } catch (final RuntimeException e) {
                 // just log an error if the consumer throws an exception during close
                 // so we can always attempt to close the state stores.
                 log.error("Failed to close consumer due to the following error:", e);
@@ -291,7 +291,7 @@ public class GlobalStreamThread extends Thread {
 
             try {
                 stateConsumer.close();
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 log.error("Failed to close state maintainer due to the following error:", e);
             }
             setState(DEAD);
@@ -303,12 +303,12 @@ public class GlobalStreamThread extends Thread {
     private StateConsumer initialize() {
         try {
             final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology,
-                                                                           consumer,
+                                                                           globalConsumer,
                                                                            stateDirectory,
                                                                            stateRestoreListener);
             final StateConsumer stateConsumer
                     = new StateConsumer(this.logContext,
-                                        consumer,
+                                        globalConsumer,
                                         new GlobalStateUpdateTask(topology,
                                                                   new GlobalProcessorContextImpl(
                                                                           config,
@@ -323,9 +323,9 @@ public class GlobalStreamThread extends Thread {
                                         config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
             stateConsumer.initialize();
             return stateConsumer;
-        } catch (StreamsException e) {
+        } catch (final StreamsException e) {
             startupException = e;
-        } catch (Exception e) {
+        } catch (final Exception e) {
             startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e);
         }
         return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a038d09..ae2b375 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -110,11 +109,7 @@ public class InternalTopicManager {
     }
 
     public void close() {
-        try {
-            streamsKafkaClient.close();
-        } catch (IOException e) {
-            log.warn("Could not close StreamsKafkaClient.");
-        }
+        streamsKafkaClient.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index bbe570c..83c783d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
@@ -41,7 +42,7 @@ import java.util.Set;
 public class StoreChangelogReader implements ChangelogReader {
 
     private final Logger log;
-    private final Consumer<byte[], byte[]> consumer;
+    private final Consumer<byte[], byte[]> restoreConsumer;
     private final StateRestoreListener userStateRestoreListener;
     private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
@@ -49,10 +50,10 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
 
-    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
+    public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
                                 final StateRestoreListener userStateRestoreListener,
                                 final LogContext logContext) {
-        this.consumer = consumer;
+        this.restoreConsumer = restoreConsumer;
         this.log = logContext.logger(getClass());
         this.userStateRestoreListener = userStateRestoreListener;
     }
@@ -73,26 +74,26 @@ public class StoreChangelogReader implements ChangelogReader {
         }
 
         if (needsRestoring.isEmpty()) {
-            consumer.assign(Collections.<TopicPartition>emptyList());
+            restoreConsumer.unsubscribe();
             return completed();
         }
 
         final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
-        final ConsumerRecords<byte[], byte[]> allRecords = consumer.poll(10);
+        final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
         for (final TopicPartition partition : partitions) {
             restorePartition(allRecords, partition, active.restoringTaskFor(partition));
         }
 
         if (needsRestoring.isEmpty()) {
-            consumer.assign(Collections.<TopicPartition>emptyList());
+            restoreConsumer.unsubscribe();
         }
 
         return completed();
     }
 
     private void initialize() {
-        if (!consumer.subscription().isEmpty()) {
-            throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + consumer.subscription() + ")");
+        if (!restoreConsumer.subscription().isEmpty()) {
+            throw new StreamsException("Restore consumer should not be subscribed to any topics (" + restoreConsumer.subscription() + ")");
         }
 
         // first refresh the changelog partition information from brokers, since initialize is only called when
@@ -110,7 +111,7 @@ public class StoreChangelogReader implements ChangelogReader {
         // try to fetch end offsets for the initializable restorers and remove any partitions
         // where we already have all of the data
         try {
-            endOffsets.putAll(consumer.endOffsets(initializable.keySet()));
+            endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet()));
         } catch (final TimeoutException e) {
             // if timeout exception gets thrown we just give up this time and retry in the next run loop
             log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
@@ -151,27 +152,27 @@ public class StoreChangelogReader implements ChangelogReader {
     private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) {
         log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
 
-        final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
+        final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
         assignment.addAll(initialized.keySet());
-        consumer.assign(assignment);
+        restoreConsumer.assign(assignment);
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
         for (final StateRestorer restorer : initialized.values()) {
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                consumer.seek(restorer.partition(), restorer.checkpoint());
+                restoreConsumer.seek(restorer.partition(), restorer.checkpoint());
                 logRestoreOffsets(restorer.partition(),
                         restorer.checkpoint(),
                         endOffsets.get(restorer.partition()));
-                restorer.setStartingOffset(consumer.position(restorer.partition()));
+                restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
                 restorer.restoreStarted();
             } else {
-                consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = consumer.position(restorer.partition());
+            final long position = restoreConsumer.position(restorer.partition());
             logRestoreOffsets(restorer.partition(),
                     position,
                     endOffsets.get(restorer.partition()));
@@ -200,7 +201,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
     private void refreshChangelogInfo() {
         try {
-            partitionInfo.putAll(consumer.listTopics());
+            partitionInfo.putAll(restoreConsumer.listTopics());
         } catch (final TimeoutException e) {
             log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop");
         }
@@ -270,7 +271,7 @@ public class StoreChangelogReader implements ChangelogReader {
         }
 
         if (offset == -1) {
-            offset = consumer.position(restorer.partition());
+            offset = restoreConsumer.position(restorer.partition());
         }
 
         if (!restoreRecords.isEmpty()) {
@@ -278,7 +279,7 @@ public class StoreChangelogReader implements ChangelogReader {
             restorer.restoreBatchCompleted(offset + 1, records.size());
         }
 
-        return consumer.position(restorer.partition());
+        return restoreConsumer.position(restorer.partition());
     }
 
     private boolean hasPartition(final TopicPartition topicPartition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 1e99ad2..075f445 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -51,6 +52,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.BrokerNotFoundException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -85,16 +87,18 @@ public class StreamsKafkaClient {
     private final KafkaClient kafkaClient;
     private final List<MetricsReporter> reporters;
     private final Config streamsConfig;
+    private final Logger log;
     private final Map<String, String> defaultTopicConfigs = new HashMap<>();
     private static final int MAX_INFLIGHT_REQUESTS = 100;
 
-
     StreamsKafkaClient(final Config streamsConfig,
                        final KafkaClient kafkaClient,
-                       final List<MetricsReporter> reporters) {
+                       final List<MetricsReporter> reporters,
+                       final LogContext log) {
         this.streamsConfig = streamsConfig;
         this.kafkaClient = kafkaClient;
         this.reporters = reporters;
+        this.log = log.logger(StreamsKafkaClient.class);
         extractDefaultTopicConfigs(streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX));
     }
 
@@ -114,17 +118,18 @@ public class StreamsKafkaClient {
         final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
         metricTags.put("client-id", clientId);
 
-        final Metadata metadata = new Metadata(streamsConfig.getLong(
-                StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-                                               streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG), false);
+        final Metadata metadata = new Metadata(streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
+                                               streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
+                                               false);
         final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
         metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
 
         final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
                 .timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                 .tags(metricTags);
-        final List<MetricsReporter> reporters = streamsConfig.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
-                                                         MetricsReporter.class);
+        final List<MetricsReporter> reporters = streamsConfig.getConfiguredInstances(
+            ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+            MetricsReporter.class);
         // TODO: This should come from the KafkaStream
         reporters.add(new JmxReporter("kafka.admin.client"));
         final Metrics metrics = new Metrics(metricConfig, reporters, time);
@@ -154,7 +159,7 @@ public class StreamsKafkaClient {
                 true,
                 new ApiVersions(),
                 logContext);
-        return new StreamsKafkaClient(streamsConfig, kafkaClient, reporters);
+        return new StreamsKafkaClient(streamsConfig, kafkaClient, reporters, logContext);
     }
 
     private static LogContext createLogContext(String clientId) {
@@ -165,9 +170,13 @@ public class StreamsKafkaClient {
         return create(Config.fromStreamsConfig(streamsConfig));
     }
 
-    public void close() throws IOException {
+    public void close() {
         try {
             kafkaClient.close();
+        } catch (final IOException impossible) {
+            // this can actually never happen, because NetworkClient doesn't throw any exception on close()
+            // we log just in case
+            log.error("This error indicates a bug in the code. Please report to dev@kafka.apache.org.", impossible);
         } finally {
             for (MetricsReporter metricsReporter: this.reporters) {
                 metricsReporter.close();
@@ -177,17 +186,22 @@ public class StreamsKafkaClient {
 
     /**
      * Create a set of new topics using batch request.
+     *
+     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
+     * @throws TimeoutException if there was no response within {@code request.timeout.ms}
+     * @throws StreamsException for any other fatal error
      */
-    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor,
-                             final long windowChangeLogAdditionalRetention, final MetadataResponse metadata) {
-
+    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
+                             final int replicationFactor,
+                             final long windowChangeLogAdditionalRetention,
+                             final MetadataResponse metadata) {
         final Map<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new HashMap<>();
-        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsMap.entrySet()) {
-            InternalTopicConfig internalTopicConfig = entry.getKey();
-            Integer partitions = entry.getValue();
+        for (final Map.Entry<InternalTopicConfig, Integer> entry : topicsMap.entrySet()) {
+            final InternalTopicConfig internalTopicConfig = entry.getKey();
+            final Integer partitions = entry.getValue();
             final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
             final Map<String, String> topicConfig = new HashMap<>(defaultTopicConfigs);
-            for (String key : topicProperties.stringPropertyNames()) {
+            for (final String key : topicProperties.stringPropertyNames()) {
                 topicConfig.put(key, topicProperties.getProperty(key));
             }
             final CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(
@@ -205,7 +219,7 @@ public class StreamsKafkaClient {
                 streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG)),
             Time.SYSTEM.milliseconds(),
             true);
-        final ClientResponse clientResponse = sendRequest(clientRequest);
+        final ClientResponse clientResponse = sendRequestSync(clientRequest);
 
         if (!clientResponse.hasResponse()) {
             throw new StreamsException("Empty response for client request.");
@@ -228,6 +242,7 @@ public class StreamsKafkaClient {
      *
      * @param nodes List of nodes to pick from.
      * @return The first node that is ready to accept requests.
+     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
      */
     private String ensureOneNodeIsReady(final List<Node> nodes) {
         String brokerId = null;
@@ -243,7 +258,7 @@ public class StreamsKafkaClient {
             }
             try {
                 kafkaClient.poll(50, Time.SYSTEM.milliseconds());
-            } catch (final Exception e) {
+            } catch (final RuntimeException e) {
                 throw new StreamsException("Could not poll.", e);
             }
         }
@@ -257,9 +272,9 @@ public class StreamsKafkaClient {
     }
 
     /**
-     *
      * @return if Id of the controller node, or an exception if no controller is found or
      * controller is not ready
+     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
      */
     private String getControllerReadyBrokerId(final MetadataResponse metadata) {
         return ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
@@ -267,6 +282,7 @@ public class StreamsKafkaClient {
 
     /**
      * @return the Id of any broker that is ready, or an exception if no broker is ready.
+     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
      */
     private String getAnyReadyBrokerId() {
         final Metadata metadata = new Metadata(
@@ -280,26 +296,32 @@ public class StreamsKafkaClient {
         return ensureOneNodeIsReady(nodes);
     }
 
-    private ClientResponse sendRequest(final ClientRequest clientRequest) {
+    /**
+     * @return the response to the request
+     * @throws TimeoutException if there was no response within {@code request.timeout.ms}
+     * @throws StreamsException any other fatal error
+     */
+    private ClientResponse sendRequestSync(final ClientRequest clientRequest) {
         try {
             kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
-        } catch (final Exception e) {
+        } catch (final RuntimeException e) {
             throw new StreamsException("Could not send request.", e);
         }
-        final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
         // Poll for the response.
+        final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
         while (Time.SYSTEM.milliseconds() < responseTimeout) {
             final List<ClientResponse> responseList;
             try {
                 responseList = kafkaClient.poll(100, Time.SYSTEM.milliseconds());
-            } catch (final IllegalStateException e) {
+            } catch (final RuntimeException e) {
                 throw new StreamsException("Could not poll.", e);
             }
             if (!responseList.isEmpty()) {
                 if (responseList.size() > 1) {
                     throw new StreamsException("Sent one request but received multiple or no responses.");
                 }
-                ClientResponse response = responseList.get(0);
+                final ClientResponse response = responseList.get(0);
                 if (response.requestHeader().correlationId() == clientRequest.correlationId()) {
                     return response;
                 } else {
@@ -309,21 +331,24 @@ public class StreamsKafkaClient {
                 }
             }
         }
-        throw new StreamsException("Failed to get response from broker within timeout");
 
+        throw new TimeoutException("Failed to get response from broker within timeout");
     }
 
     /**
-     * Fetch the metadata for all topics
+     * Fetch the metadata for all topics.
+     *
+     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
+     * @throws TimeoutException if there was no response within {@code request.timeout.ms}
+     * @throws StreamsException for any other fatal error
      */
     public MetadataResponse fetchMetadata() {
-
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
             getAnyReadyBrokerId(),
             MetadataRequest.Builder.allTopics(),
             Time.SYSTEM.milliseconds(),
             true);
-        final ClientResponse clientResponse = sendRequest(clientRequest);
+        final ClientResponse clientResponse = sendRequestSync(clientRequest);
 
         if (!clientResponse.hasResponse()) {
             throw new StreamsException("Empty response for client request.");
@@ -342,7 +367,10 @@ public class StreamsKafkaClient {
      * Note, for <em>pre</em> 0.10.x brokers the broker version cannot be checked and the client will hang and retry
      * until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
      *
+     * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
+     * @throws TimeoutException if there was no response within {@code request.timeout.ms}
      * @throws StreamsException if brokers have version 0.10.0.x
+     * @throws StreamsException for any other fatal error
      */
     public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException {
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
@@ -351,7 +379,7 @@ public class StreamsKafkaClient {
             Time.SYSTEM.milliseconds(),
             true);
 
-        final ClientResponse clientResponse = sendRequest(clientRequest);
+        final ClientResponse clientResponse = sendRequestSync(clientRequest);
         if (!clientResponse.hasResponse()) {
             throw new StreamsException("Empty response for client request.");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 8dc477f..6f4cc51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -197,7 +196,7 @@ class TaskManager {
         firstException.compareAndSet(null, active.suspend());
         firstException.compareAndSet(null, standby.suspend());
         // remove the changelog partitions from restore consumer
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        restoreConsumer.unsubscribe();
 
         final Exception exception = firstException.get();
         if (exception != null) {
@@ -223,7 +222,7 @@ class TaskManager {
             log.error("Failed to close KafkaStreamClient due to the following error:", e);
         }
         // remove the changelog partitions from restore consumer
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        restoreConsumer.unsubscribe();
         taskCreator.close();
         standbyTaskCreator.close();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 8fcdd03..c1b1021 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -40,21 +40,23 @@ public class HostInfo {
     private final String host;
     private final int port;
 
-    public HostInfo(String host, int port) {
+    public HostInfo(final String host,
+                    final int port) {
         this.host = host;
         this.port = port;
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        HostInfo hostInfo = (HostInfo) o;
-
-        if (port != hostInfo.port) return false;
-        return host.equals(hostInfo.host);
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
+        final HostInfo hostInfo = (HostInfo) o;
+        return port == hostInfo.port && host.equals(hostInfo.host);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 1cd6bee..7d032a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -18,8 +18,10 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -33,10 +35,12 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
+import static org.junit.Assert.assertEquals;
 
 public class InternalTopicManagerTest {
 
@@ -44,6 +48,7 @@ public class InternalTopicManagerTest {
     private final String userEndPoint = "localhost:2171";
     private MockStreamKafkaClient streamsKafkaClient;
     private final Time time = new MockTime();
+
     @Before
     public void init() {
         final StreamsConfig config = new StreamsConfig(configProps());
@@ -57,54 +62,83 @@ public class InternalTopicManagerTest {
 
     @Test
     public void shouldReturnCorrectPartitionCounts() {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
-        Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
+        final InternalTopicManager internalTopicManager = new InternalTopicManager(
+            streamsKafkaClient,
+            1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+            time);
+        assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
     }
 
     @Test
     public void shouldCreateRequiredTopics() {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
-        internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
+        streamsKafkaClient.returnNoMetadata = true;
+
+        final InternalTopicManager internalTopicManager = new InternalTopicManager(
+            streamsKafkaClient,
+            1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+            time);
+
+        final InternalTopicConfig topicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null);
+        internalTopicManager.makeReady(Collections.singletonMap(topicConfig, 1));
+
+        assertEquals(Collections.singletonMap(topic, topicConfig), streamsKafkaClient.createdTopics);
+        assertEquals(Collections.singletonMap(topic, 1), streamsKafkaClient.numberOfPartitionsPerTopic);
+        assertEquals(Collections.singletonMap(topic, 1), streamsKafkaClient.replicationFactorPerTopic);
     }
 
     @Test
     public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
-        boolean exceptionWasThrown = false;
+        final InternalTopicManager internalTopicManager = new InternalTopicManager(
+            streamsKafkaClient,
+            1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+            time);
         try {
             internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2));
-        } catch (StreamsException e) {
-            exceptionWasThrown = true;
-        }
-        Assert.assertTrue(exceptionWasThrown);
+            Assert.fail("Should have thrown StreamsException");
+        } catch (StreamsException expected) { /* pass */ }
     }
 
     @Test
     public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
 
         // create topic the first time with replication 2
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 2,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
-        internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
+        final InternalTopicManager internalTopicManager = new InternalTopicManager(
+            streamsKafkaClient,
+            2,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+            time);
+        internalTopicManager.makeReady(Collections.singletonMap(
+            new InternalTopicConfig(topic,
+                                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                    null),
+            1));
 
         // attempt to create it again with replication 1
-        InternalTopicManager internalTopicManager2 = new InternalTopicManager(streamsKafkaClient, 1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
-        try {
-            internalTopicManager2.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
-        } catch (StreamsException e) {
-            Assert.fail("did not expect an exception since topic is already there.");
-        }
+        final InternalTopicManager internalTopicManager2 = new InternalTopicManager(
+            streamsKafkaClient,
+            1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+            time);
+
+        internalTopicManager2.makeReady(Collections.singletonMap(
+            new InternalTopicConfig(topic,
+                                    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                   null),
+            1));
     }
 
     @Test
     public void shouldNotThrowExceptionForEmptyTopicMap() {
-        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
-            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
-        internalTopicManager.makeReady(Collections.EMPTY_MAP);
+        final InternalTopicManager internalTopicManager = new InternalTopicManager(
+            streamsKafkaClient,
+            1,
+            WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+            time);
+
+        internalTopicManager.makeReady(Collections.<InternalTopicConfig, Integer>emptyMap());
     }
 
     private Properties configProps() {
@@ -120,24 +154,54 @@ public class InternalTopicManagerTest {
 
     private class MockStreamKafkaClient extends StreamsKafkaClient {
 
+        boolean returnNoMetadata = false;
+
+        Map<String, InternalTopicConfig> createdTopics = new HashMap<>();
+        Map<String, Integer> numberOfPartitionsPerTopic = new HashMap<>();
+        Map<String, Integer> replicationFactorPerTopic = new HashMap<>();
+
         MockStreamKafkaClient(final StreamsConfig streamsConfig) {
-            super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig), new MockClient(new MockTime()), Collections.EMPTY_LIST);
+            super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig),
+                  new MockClient(new MockTime()),
+                  Collections.<MetricsReporter>emptyList(),
+                  new LogContext());
         }
 
         @Override
-        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor,
-                                 final long windowChangeLogAdditionalRetention, final MetadataResponse metadata) {
-            // do nothing
+        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
+                                 final int replicationFactor,
+                                 final long windowChangeLogAdditionalRetention,
+                                 final MetadataResponse metadata) {
+            for (final Map.Entry<InternalTopicConfig, Integer> topic : topicsMap.entrySet()) {
+                final InternalTopicConfig config = topic.getKey();
+                final String topicName = config.name();
+                createdTopics.put(topicName, config);
+                numberOfPartitionsPerTopic.put(topicName, topic.getValue());
+                replicationFactorPerTopic.put(topicName, replicationFactor);
+            }
         }
 
         @Override
         public MetadataResponse fetchMetadata() {
-            Node node = new Node(1, "host1", 1001);
-            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>(), new ArrayList<Node>());
-            MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
-            MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
-                Collections.singletonList(topicMetadata));
-            return response;
+            final Node node = new Node(1, "host1", 1001);
+            final MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>(), new ArrayList<Node>());
+            final MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
+            final MetadataResponse metadataResponse;
+            if (returnNoMetadata) {
+                metadataResponse = new MetadataResponse(
+                    Collections.<Node>singletonList(node),
+                    null,
+                    MetadataResponse.NO_CONTROLLER_ID,
+                    Collections.<MetadataResponse.TopicMetadata>emptyList());
+            } else {
+                metadataResponse = new MetadataResponse(
+                    Collections.<Node>singletonList(node),
+                    null,
+                    MetadataResponse.NO_CONTROLLER_ID,
+                    Collections.singletonList(topicMetadata));
+            }
+
+            return metadataResponse;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 3c54851..705bcf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -25,10 +25,12 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
+import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -94,11 +96,19 @@ public class StoreChangelogReaderTest {
 
     @Test
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
+        final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class);
+        mockRestorer.setUserRestoreListener(stateRestoreListener);
+        expect(mockRestorer.partition()).andReturn(new TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0));
+        EasyMock.replay(mockRestorer);
+        changelogReader.register(mockRestorer);
+
+
         consumer.subscribe(Collections.singleton("sometopic"));
+
         try {
             changelogReader.restore(active);
             fail("Should have thrown IllegalStateException");
-        } catch (final IllegalStateException e) {
+        } catch (final StreamsException expected) {
             // ok
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
index 0bb7682..a399dd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -214,8 +215,9 @@ public class StreamsKafkaClientTest {
     private StreamsKafkaClient createStreamsKafkaClient() {
         final StreamsConfig streamsConfig = new StreamsConfig(config);
         return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig),
-                                                                             kafkaClient,
-                                                                             reporters);
+                                      kafkaClient,
+                                      reporters,
+                                      new LogContext());
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 67dd6c0..b11b8c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -218,7 +218,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldUnassignChangelogPartitionsOnSuspend() {
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        restoreConsumer.unsubscribe();
         EasyMock.expectLastCall();
         replay();
 
@@ -231,7 +231,7 @@ public class TaskManagerTest {
         EasyMock.expect(active.suspend()).andReturn(new RuntimeException(""));
         EasyMock.expect(standby.suspend()).andReturn(new RuntimeException(""));
         EasyMock.expectLastCall();
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        restoreConsumer.unsubscribe();
 
         replay();
         try {
@@ -265,7 +265,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldUnassignChangelogPartitionsOnShutdown() {
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        restoreConsumer.unsubscribe();
         EasyMock.expectLastCall();
         replay();
 
@@ -483,8 +483,6 @@ public class TaskManagerTest {
         EasyMock.replay(task);
     }
 
-
-
     private void mockStandbyTaskExpectations() {
         mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
         expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 67d5877..15d67b9 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -196,7 +196,7 @@ class StreamsBrokerBounceTest(Test):
         time.sleep(sleep_time_secs)
 
         # Fail brokers
-        self.fail_broker_type(failure_mode, broker_type);
+        self.fail_broker_type(failure_mode, broker_type)
 
         return self.collect_results(sleep_time_secs)
 
@@ -216,7 +216,7 @@ class StreamsBrokerBounceTest(Test):
         time.sleep(sleep_time_secs)
 
         # Fail brokers
-        self.fail_broker_type(failure_mode, broker_type);
+        self.fail_broker_type(failure_mode, broker_type)
 
         return self.collect_results(sleep_time_secs)
 
@@ -234,7 +234,7 @@ class StreamsBrokerBounceTest(Test):
         time.sleep(120)
 
         # Fail brokers
-        self.fail_many_brokers(failure_mode, num_failures);
+        self.fail_many_brokers(failure_mode, num_failures)
 
         return self.collect_results(120)
 
@@ -252,6 +252,6 @@ class StreamsBrokerBounceTest(Test):
         time.sleep(120)
 
         # Fail brokers
-        self.fail_many_brokers(failure_mode, num_failures);
+        self.fail_many_brokers(failure_mode, num_failures)
 
         return self.collect_results(120)