You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/19 23:13:14 UTC

kafka git commit: MINOR: various random minor fixes and improve KafkaConsumer JavaDocs

Repository: kafka
Updated Branches:
  refs/heads/trunk 80e0af50d -> bd0146d98


MINOR: various random minor fixes and improve KafkaConsumer JavaDocs

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3884 from mjsax/minor-fixed-discoverd-via-exception-handling-investigation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd0146d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd0146d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd0146d9

Branch: refs/heads/trunk
Commit: bd0146d984dd5df82fb19aa936e8f4ff9ca40030
Parents: 80e0af5
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Sep 20 07:13:03 2017 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Sep 20 07:13:03 2017 +0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 104 ++++++++++------
 .../producer/BufferExhaustedException.java      |   2 +-
 .../kafka/clients/producer/KafkaProducer.java   |  15 ++-
 .../apache/kafka/clients/producer/Producer.java |  33 +++---
 .../kafka/common/metrics/KafkaMetric.java       |   1 -
 .../main/scala/kafka/tools/StreamsResetter.java |   2 -
 .../org/apache/kafka/streams/KafkaStreams.java  | 118 ++++++++++---------
 .../org/apache/kafka/streams/StreamsConfig.java |   8 +-
 .../internals/AbstractProcessorContext.java     |  15 +--
 .../processor/internals/AbstractTask.java       |   1 +
 .../processor/internals/AssignedTasks.java      |   8 +-
 .../internals/InternalTopologyBuilder.java      |   2 +-
 .../processor/internals/RecordQueue.java        |  28 ++---
 .../internals/SourceNodeRecordDeserializer.java |   4 +-
 .../processor/internals/StandbyTask.java        |  37 +++---
 .../internals/StoreChangelogReader.java         |   9 +-
 .../streams/processor/internals/StreamTask.java |  21 ++--
 .../processor/internals/StreamThread.java       |  26 ++--
 .../processor/internals/TaskManager.java        |  20 +---
 19 files changed, 251 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 76e4073..c2f2f5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -112,7 +112,7 @@ import java.util.regex.Pattern;
  * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and
  * processing records. These processes can either be running on the same machine or they can be
  * distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances
- * sharing the same <code>group.id</code> will be part of the same consumer group.
+ * sharing the same {@code group.id} will be part of the same consumer group.
  * <p>
  * Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the
  * {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the
@@ -152,12 +152,12 @@ import java.util.regex.Pattern;
  * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
  * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
  * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
- * a duration of <code>session.timeout.ms</code>, then the consumer will be considered dead and its partitions will
+ * a duration of {@code session.timeout.ms}, then the consumer will be considered dead and its partitions will
  * be reassigned.
  * <p>
  * It is also possible that the consumer could encounter a "livelock" situation where it is continuing
  * to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions
- * indefinitely in this case, we provide a liveness detection mechanism using the <code>max.poll.interval.ms</code>
+ * indefinitely in this case, we provide a liveness detection mechanism using the {@code max.poll.interval.ms}
  * setting. Basically if you don't call poll at least as frequently as the configured max interval,
  * then the client will proactively leave the group so that another consumer can take over its partitions. When this happens,
  * you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}).
@@ -211,15 +211,15 @@ import java.util.regex.Pattern;
  * </pre>
  *
  * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
- * configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the
+ * configuration {@code >bootstrap.servers}. This list is just used to discover the rest of the brokers in the
  * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
  * case there are servers down when the client is connecting).
  * <p>
- * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
- * the config <code>auto.commit.interval.ms</code>.
+ * Setting {@code enable.auto.commit} means that offsets are committed automatically with a frequency controlled by
+ * the config {@code auto.commit.interval.ms}.
  * <p>
  * In this example the consumer is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
- * called <i>test</i> as configured with <code>group.id</code>.
+ * called <i>test</i> as configured with {@code group.id}.
  * <p>
  * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
  * are saying that our record's key and value will just be simple strings.
@@ -423,8 +423,7 @@ import java.util.regex.Pattern;
  * <p>
  * Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
  * In order for this to work, consumers reading from these partitions should be configured to only read committed data.
- * This can be achieved by by setting the <code>isolation.level=read_committed</code> in the consumer's configuration.
- * </p>
+ * This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration.
  *
  * <p>
  * In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been
@@ -433,17 +432,19 @@ import java.util.regex.Pattern;
  * consumer would be the offset of the first message in the partition belonging to an open transaction. This offset
  * is known as the 'Last Stable Offset'(LSO).</p>
  *
- * <p>A </p><code>read_committed</code> consumer will only read up till the LSO and filter out any transactional
+ * <p>
+ * A {@code read_committed} consumer will only read up to the LSO and filter out any transactional
  * messages which have been aborted. The LSO also affects the behavior of {@link #seekToEnd(Collection)} and
- * {@link #endOffsets(Collection)} for <code>read_committed</code> consumers, details of which are in each method's documentation.
- * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for <code>read_committed</code> consumers.</p>
+ * {@link #endOffsets(Collection)} for {@code read_committed} consumers, details of which are in each method's documentation.
+ * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for {@code read_committed} consumers.
  *
- * <p>Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction.
+ * <p>
+ * Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction.
  * There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from
  * topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction
  * markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using
- * <code>read_committed</code> consumers may also see gaps due to aborted transactions, since those messages would not
- * be returned by the consumer and yet would have valid offsets.</p>
+ * {@code read_committed} consumers may also see gaps due to aborted transactions, since those messages would not
+ * be returned by the consumer and yet would have valid offsets.
  *
  * <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
  *
@@ -869,7 +870,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param topics The list of topics to subscribe to
      * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
      *                 subscribed topics
-     * @throws IllegalArgumentException If topics is null or contains null or empty elements
+     * @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null
+     * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
+     *                               previously (without a subsequent call to {@link #unsubscribe()})
      */
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
@@ -911,6 +914,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param topics The list of topics to subscribe to
      * @throws IllegalArgumentException If topics is null or contains null or empty elements
+     * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
+     *                               previously (without a subsequent call to {@link #unsubscribe()})
      */
     @Override
     public void subscribe(Collection<String> topics) {
@@ -918,9 +923,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics
-     * existing at the time of check.
-     *
+     * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+     * The pattern matching will be done periodically against topic existing at the time of check.
      * <p>
      * As part of group management, the consumer will keep track of the list of consumers that
      * belong to a particular group and will trigger a rebalance operation if one of the
@@ -935,7 +939,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param pattern Pattern to subscribe to
      * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
      *                 subscribed topics
-     * @throws IllegalArgumentException If pattern is null
+     * @throws IllegalArgumentException If pattern or listener is null
+     * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
+     *                               previously (without a subsequent call to {@link #unsubscribe()})
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -956,7 +962,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
      * The pattern matching will be done periodically against topics existing at the time of check.
-     *
      * <p>
      * This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which
      * uses a noop listener. If you need the ability to seek to particular offsets, you should prefer
@@ -966,6 +971,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param pattern Pattern to subscribe to
      * @throws IllegalArgumentException If pattern is null
+     * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
+     *                               previously (without a subsequent call to {@link #unsubscribe()})
      */
     @Override
     public void subscribe(Pattern pattern) {
@@ -973,8 +980,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This
-     * also clears any partitions directly assigned through {@link #assign(Collection)}.
+     * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
+     * This also clears any partitions directly assigned through {@link #assign(Collection)}.
      */
     public void unsubscribe() {
         acquireAndEnsureOpen();
@@ -991,17 +998,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
      * and will replace the previous assignment (if there is one).
-     *
+     * <p>
      * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
-     *
      * <p>
      * Manual topic assignment through this method does not use the consumer's group management
      * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
      * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
      * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
+     * <p>
+     * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
+     * assignment replaces the old one.
      *
      * @param partitions The list of partitions to assign this consumer
      * @throws IllegalArgumentException If partitions is null or contains null or empty topics
+     * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
+     *                               (without a subsequent call to {@link #unsubscribe()})
      */
     @Override
     public void assign(Collection<TopicPartition> partitions) {
@@ -1289,6 +1300,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     *
+     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
+     *                                  or if provided offset is negative
      */
     @Override
     public void seek(TopicPartition partition, long offset) {
@@ -1307,11 +1321,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
      * first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
-     * If no partition is provided, seek to the first offset for all of the currently assigned partitions.
+     * If no partitions are provided, seek to the first offset for all of the currently assigned partitions.
+     *
+     * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
      */
     public void seekToBeginning(Collection<TopicPartition> partitions) {
         acquireAndEnsureOpen();
         try {
+            if (partitions == null) {
+                throw new IllegalArgumentException("Partitions collection cannot be null");
+            }
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to beginning of partition {}", tp);
@@ -1325,14 +1344,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
      * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
-     * If no partition is provided, seek to the final offset for all of the currently assigned partitions.
-     *
-     * If <code>isolation.level=read_committed</code>, the end offset will be the Last Stable Offset, ie. the offset
+     * If no partitions are provided, seek to the final offset for all of the currently assigned partitions.
+     * <p>
+     * If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset
      * of the first message with an open transaction.
+     *
+     * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
      */
     public void seekToEnd(Collection<TopicPartition> partitions) {
         acquireAndEnsureOpen();
         try {
+            if (partitions == null) {
+                throw new IllegalArgumentException("Partitions collection cannot be null");
+            }
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to end of partition {}", tp);
@@ -1348,6 +1372,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param partition The partition to get the position for
      * @return The offset
+     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
      * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
      *             the partition
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
@@ -1471,6 +1496,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Note that this method does not affect partition subscription. In particular, it does not cause a group
      * rebalance when automatic assignment is used.
      * @param partitions The partitions which should be paused
+     * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer
      */
     @Override
     public void pause(Collection<TopicPartition> partitions) {
@@ -1490,6 +1516,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
      * If the partitions were not previously paused, this method is a no-op.
      * @param partitions The partitions which should be resumed
+     * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer
      */
     @Override
     public void resume(Collection<TopicPartition> partitions) {
@@ -1534,6 +1561,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
      *         such message.
      * @throws IllegalArgumentException if the target timestamp is negative.
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         expiration of the configured request timeout
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
      *         the offsets by timestamp.
      */
@@ -1564,6 +1593,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param partitions the partitions to get the earliest offsets.
      * @return The earliest available offsets for the given partitions
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         expiration of the configured request timeout
      */
     @Override
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
@@ -1581,16 +1612,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <p>
      * Notice that this method may block indefinitely if the partition does not exist.
      * This method does not change the current consumer position of the partitions.
-     * </p>
-     *
-     * <p>When <code>isolation.level=read_committed</code> the last offset will be the Last Stable Offset (LSO).
+     * <p>
+     * When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO).
      * This is the offset of the first message with an open transaction. The LSO moves forward as transactions
-     * are completed.</p>
+     * are completed.
      *
      * @see #seekToEnd(Collection)
      *
      * @param partitions the partitions to get the end offsets.
      * @return The end offsets for the given partitions.
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         expiration of the configured request timeout
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
@@ -1619,7 +1651,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Tries to close the consumer cleanly within the specified timeout. This method waits up to
-     * <code>timeout</code> for the consumer to complete pending commits and leave the group.
+     * {@code timeout} for the consumer to complete pending commits and leave the group.
      * If auto-commit is enabled, this will commit the current offsets if possible within the
      * timeout. If the consumer is unable to complete offset commits and gracefully leave the group
      * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
@@ -1627,9 +1659,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
      *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
-     * @param timeUnit The time unit for the <code>timeout</code>
+     * @param timeUnit The time unit for the {@code timeout}
      * @throws InterruptException If the thread is interrupted before or while this function is called
-     * @throws IllegalArgumentException If the <code>timeout</code> is negative.
+     * @throws IllegalArgumentException If the {@code timeout} is negative.
      */
     public void close(long timeout, TimeUnit timeUnit) {
         if (timeout < 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
index 929b6b9..be840db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.KafkaException;
 
 /**
  * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at
- * which data can be sent for long enough for the alloted buffer to be exhausted.
+ * which data can be sent for long enough for the allocated buffer to be exhausted.
  */
 public class BufferExhaustedException extends KafkaException {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 7d51640..7dcec5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -20,7 +20,9 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.ProducerMetrics;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -564,12 +566,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     /**
-     * Sends a list of consumed offsets to the consumer group coordinator, and also marks
+     * Sends a list of specified offsets to the consumer group coordinator, and also marks
      * those offsets as part of the current transaction. These offsets will be considered
-     * consumed only if the transaction is committed successfully.
-     *
+     * committed only if the transaction is committed successfully. The committed offset should
+     * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
+     * <p>
      * This method should be used when you need to batch consumed and produced messages
-     * together, typically in a consume-transform-produce pattern.
+     * together, typically in a consume-transform-produce pattern. Thus, the specified
+     * {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
+     * {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
+     * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
+     * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
      *
      * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
      * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 1e77633..4982033 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -63,43 +63,38 @@ public interface Producer<K, V> extends Closeable {
     void abortTransaction() throws ProducerFencedException;
 
     /**
-     * Send the given record asynchronously and return a future which will eventually contain the response information.
-     *
-     * @param record The record to send
-     * @return A future which will eventually contain the response information
+     * See {@link KafkaProducer#send(ProducerRecord)}
      */
-    public Future<RecordMetadata> send(ProducerRecord<K, V> record);
+    Future<RecordMetadata> send(ProducerRecord<K, V> record);
 
     /**
-     * Send a record and invoke the given callback when the record has been acknowledged by the server
+     * See {@link KafkaProducer#send(ProducerRecord, Callback)}
      */
-    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
+    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
 
     /**
-     * Flush any accumulated records from the producer. Blocks until all sends are complete.
+     * See {@link KafkaProducer#flush()}
      */
-    public void flush();
+    void flush();
 
     /**
-     * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
-     * over time so this list should not be cached.
+     * See {@link KafkaProducer#partitionsFor(String)}
      */
-    public List<PartitionInfo> partitionsFor(String topic);
+    List<PartitionInfo> partitionsFor(String topic);
 
     /**
-     * Return a map of metrics maintained by the producer
+     * See {@link KafkaProducer#metrics()}
      */
-    public Map<MetricName, ? extends Metric> metrics();
+    Map<MetricName, ? extends Metric> metrics();
 
     /**
-     * Close this producer
+     * See {@link KafkaProducer#close()}
      */
-    public void close();
+    void close();
 
     /**
-     * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the
-     * timeout, fail any pending send requests and force close the producer.
+     * See {@link KafkaProducer#close(long, TimeUnit)}
      */
-    public void close(long timeout, TimeUnit unit);
+    void close(long timeout, TimeUnit unit);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index 1cd5b24..ef53b89 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -29,7 +29,6 @@ public final class KafkaMetric implements Metric {
     private MetricConfig config;
 
     KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
-        super();
         this.metricName = metricName;
         this.lock = lock;
         this.measurable = measurable;

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 7ee5424..9cf0e5c 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -22,11 +22,9 @@ import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import joptsimple.OptionSpecBuilder;
-
 import kafka.admin.AdminClient;
 import kafka.admin.TopicCommand;
 import kafka.utils.ZkUtils;
-
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b31a3e3..5aec3c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -40,6 +41,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -52,7 +54,6 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.StreamsMetadata;
-import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
@@ -328,6 +329,7 @@ public class KafkaStreams {
      * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes.
      *
      * @param listener a new state listener
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
      */
     public void setStateListener(final KafkaStreams.StateListener listener) {
         if (state == State.CREATED) {
@@ -342,6 +344,7 @@ public class KafkaStreams {
      * terminates due to an uncaught exception.
      *
      * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
      */
     public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
         if (state == State.CREATED) {
@@ -362,6 +365,7 @@ public class KafkaStreams {
      * processing.
      *
      * @param globalStateRestoreListener The listener triggered when {@link StateStore} is being restored.
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
      */
     public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) {
         if (state == State.CREATED) {
@@ -491,6 +495,7 @@ public class KafkaStreams {
      *
      * @param topology the topology specifying the computational logic
      * @param props   properties for {@link StreamsConfig}
+     * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
                         final Properties props) {
@@ -502,6 +507,7 @@ public class KafkaStreams {
      *
      * @param topology the topology specifying the computational logic
      * @param config  the Kafka Streams configuration
+     * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config) {
@@ -515,6 +521,7 @@ public class KafkaStreams {
      * @param config         the Kafka Streams configuration
      * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
      *                       for the new {@code KafkaStreams} instance
+     * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
@@ -524,69 +531,33 @@ public class KafkaStreams {
 
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
-                         final KafkaClientSupplier clientSupplier) {
-        // create the metrics
-        final Time time = Time.SYSTEM;
-
-        processId = UUID.randomUUID();
-
+                         final KafkaClientSupplier clientSupplier) throws StreamsException {
         this.config = config;
 
         // The application ID is a required config and hence should always have value
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-
-        internalTopologyBuilder.setApplicationId(applicationId);
+        processId = UUID.randomUUID();
 
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
-        if (clientId.length() <= 0)
+        if (clientId.length() <= 0) {
             clientId = applicationId + "-" + processId;
+        }
 
-        this.logPrefix = String.format("stream-client [%s] ", clientId);
-
+        this.logPrefix = String.format("stream-client [%s]", clientId);
         final LogContext logContext = new LogContext(logPrefix);
-
         this.log = logContext.logger(getClass());
+        final String cleanupThreadName = clientId + "-CleanupThread";
 
-        final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
-            MetricsReporter.class);
-        reporters.add(new JmxReporter(JMX_PREFIX));
-
-        final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
-            .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
-            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                TimeUnit.MILLISECONDS);
-
-        metrics = new Metrics(metricConfig, reporters, time);
-
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
-        final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
-        GlobalStreamThread.State globalThreadState = null;
-
-        final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
-        streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
-
-        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        internalTopologyBuilder.setApplicationId(applicationId);
+        // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
+        internalTopologyBuilder.build(null);
 
-        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
+        long cacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
+        if (cacheSize < 0) {
+            cacheSize = 0;
             log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
         }
 
-        final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
-            (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
-
-        stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
-        if (globalTaskTopology != null) {
-            final String globalThreadId = clientId + "-GlobalStreamThread";
-            globalStreamThread = new GlobalStreamThread(globalTaskTopology,
-                                                        config,
-                                                        clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
-                                                        stateDirectory,
-                                                        metrics,
-                                                        time,
-                                                        globalThreadId);
-            globalThreadState = globalStreamThread.state();
-        }
-
         final StateRestoreListener delegatingStateRestoreListener = new StateRestoreListener() {
             @Override
             public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
@@ -610,6 +581,44 @@ public class KafkaStreams {
             }
         };
 
+        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        try {
+            stateDirectory = new StateDirectory(
+                applicationId,
+                config.getString(StreamsConfig.STATE_DIR_CONFIG),
+                Time.SYSTEM);
+        } catch (final ProcessorStateException fatal) {
+            throw new StreamsException(fatal);
+        }
+        streamsMetadataState = new StreamsMetadataState(
+            internalTopologyBuilder,
+            parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+
+        final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+            .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                TimeUnit.MILLISECONDS);
+        final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+            MetricsReporter.class);
+        reporters.add(new JmxReporter(JMX_PREFIX));
+        metrics = new Metrics(metricConfig, reporters, Time.SYSTEM);
+
+        GlobalStreamThread.State globalThreadState = null;
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        if (globalTaskTopology != null) {
+            final String globalThreadId = clientId + "-GlobalStreamThread";
+            globalStreamThread = new GlobalStreamThread(globalTaskTopology,
+                                                        config,
+                                                        clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
+                                                        stateDirectory,
+                                                        metrics,
+                                                        Time.SYSTEM,
+                                                        globalThreadId);
+            globalThreadState = globalStreamThread.state();
+        }
+
+        final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
+        final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         for (int i = 0; i < threads.length; i++) {
             threads[i] = StreamThread.create(internalTopologyBuilder,
                                              config,
@@ -617,14 +626,15 @@ public class KafkaStreams {
                                              processId,
                                              clientId,
                                              metrics,
-                                             time,
+                                             Time.SYSTEM,
                                              streamsMetadataState,
-                                             cacheSizeBytes,
+                                             cacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)),
                                              stateDirectory,
                                              delegatingStateRestoreListener);
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
+
         final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
         if (globalTaskTopology != null) {
             globalStreamThread.setStateListener(streamStateListener);
@@ -635,7 +645,6 @@ public class KafkaStreams {
 
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
-        final String cleanupThreadName = clientId + "-CleanupThread";
         stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
             @Override
             public Thread newThread(final Runnable r) {
@@ -690,7 +699,8 @@ public class KafkaStreams {
      * {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
 
      * @throws IllegalStateException if process was already started
-     * @throws StreamsException if the Kafka brokers have version 0.10.0.x
+     * @throws StreamsException if the Kafka brokers have version 0.10.0.x or
+     *                          if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 0.11.0.x brokers
      */
     public synchronized void start() throws IllegalStateException, StreamsException {
         log.debug("Starting Streams client");
@@ -858,7 +868,7 @@ public class KafkaStreams {
      * <p>
      * Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}.
      *
-     * @throws IllegalStateException if the instance is currently running
+     * @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running}
      */
     public void cleanUp() {
         if (isRunning()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 446f941..cde416d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -342,10 +342,10 @@ public class StreamsConfig extends AbstractConfig {
                     Importance.MEDIUM,
                     CLIENT_ID_DOC)
             .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
-                Type.CLASS,
-                LogAndFailExceptionHandler.class.getName(),
-                Importance.MEDIUM,
-                DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+                    Type.CLASS,
+                    LogAndFailExceptionHandler.class.getName(),
+                    Importance.MEDIUM,
+                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
             .define(DEFAULT_KEY_SERDE_CLASS_CONFIG,
                     Type.CLASS,
                     Serdes.ByteArraySerde.class.getName(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 3c8e077..9e853fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -46,12 +46,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     final StateManager stateManager;
 
     public AbstractProcessorContext(final TaskId taskId,
-                             final String applicationId,
-                             final StreamsConfig config,
-                             final StreamsMetrics metrics,
-                             final StateManager stateManager,
-                             final ThreadCache cache) {
-
+                                    final String applicationId,
+                                    final StreamsConfig config,
+                                    final StreamsMetrics metrics,
+                                    final StateManager stateManager,
+                                    final ThreadCache cache) {
         this.taskId = taskId;
         this.applicationId = applicationId;
         this.config = config;
@@ -93,7 +92,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
+    public void register(final StateStore store,
+                         final boolean loggingEnabled,
+                         final StateRestoreCallback stateRestoreCallback) {
         if (initialized) {
             throw new IllegalStateException("Can only create state stores during initialization.");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 5ed9aae..6734da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -226,6 +226,7 @@ public abstract class AbstractTask implements Task {
      * @throws ProcessorStateException if there is an error while closing the state manager
      * @param writeCheckpoint boolean indicating if a checkpoint file should be written
      */
+    // visible for testing
     void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
         ProcessorStateException exception = null;
         log.trace("Closing state manager");

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 2d886b7..3208f93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -137,7 +137,7 @@ class AssignedTasks {
         restoredPartitions.addAll(restored);
         for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, Task> entry = it.next();
-            Task task = entry.getValue();
+            final Task task = entry.getValue();
             if (restoredPartitions.containsAll(task.changelogPartitions())) {
                 transitionToRunning(task);
                 resume.addAll(task.partitions());
@@ -303,11 +303,12 @@ class AssignedTasks {
         builder.append("\n");
     }
 
-    private List<Task> allInitializedTasks() {
+    private List<Task> allTasks() {
         final List<Task> tasks = new ArrayList<>();
         tasks.addAll(running.values());
         tasks.addAll(suspended.values());
         tasks.addAll(restoring.values());
+        tasks.addAll(created.values());
         return tasks;
     }
 
@@ -428,8 +429,7 @@ class AssignedTasks {
     }
 
     void close(final boolean clean) {
-        close(allInitializedTasks(), clean);
-        close(created.values(), clean);
+        close(allTasks(), clean);
         clear();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index d47af88..81d2f6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1744,7 +1744,7 @@ public class InternalTopologyBuilder {
 
         private String subtopologiesAsString() {
             final StringBuilder sb = new StringBuilder();
-            sb.append("Sub-topologies: \n");
+            sb.append("Sub-topologies:\n");
             if (subtopologies.isEmpty()) {
                 sb.append("  none\n");
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index d26511c..889b6d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -42,7 +42,6 @@ public class RecordQueue {
     private final ArrayDeque<StampedRecord> fifoQueue;
     private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
     private final SourceNodeRecordDeserializer recordDeserializer;
-    private final DeserializationExceptionHandler deserializationExceptionHandler;
     private final ProcessorContext processorContext;
 
     private long partitionTime = TimestampTracker.NOT_KNOWN;
@@ -58,11 +57,9 @@ public class RecordQueue {
         this.fifoQueue = new ArrayDeque<>();
         this.timeTracker = new MinTimestampTracker<>();
         this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler);
-        this.deserializationExceptionHandler = deserializationExceptionHandler;
         this.processorContext = processorContext;
     }
 
-
     /**
      * Returns the corresponding source node in the topology
      *
@@ -87,15 +84,15 @@ public class RecordQueue {
      * @param rawRecords the raw records
      * @return the size of this queue
      */
-    public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
-        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+    int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
+        for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
 
-            ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
+            final ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
             if (record == null) {
                 continue;
             }
 
-            long timestamp = timestampExtractor.extract(record, timeTracker.get());
+            final long timestamp = timestampExtractor.extract(record, timeTracker.get());
             log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);
 
             // drop message if TS is invalid, i.e., negative
@@ -103,7 +100,7 @@ public class RecordQueue {
                 continue;
             }
 
-            StampedRecord stampedRecord = new StampedRecord(record, timestamp);
+            final StampedRecord stampedRecord = new StampedRecord(record, timestamp);
             fifoQueue.addLast(stampedRecord);
             timeTracker.addElement(stampedRecord);
         }
@@ -111,10 +108,11 @@ public class RecordQueue {
         // update the partition timestamp if its currently
         // tracked min timestamp has exceed its value; this will
         // usually only take effect for the first added batch
-        long timestamp = timeTracker.get();
+        final long timestamp = timeTracker.get();
 
-        if (timestamp > partitionTime)
+        if (timestamp > partitionTime) {
             partitionTime = timestamp;
+        }
 
         return size();
     }
@@ -125,19 +123,21 @@ public class RecordQueue {
      * @return StampedRecord
      */
     public StampedRecord poll() {
-        StampedRecord elem = fifoQueue.pollFirst();
+        final StampedRecord elem = fifoQueue.pollFirst();
 
-        if (elem == null)
+        if (elem == null) {
             return null;
+        }
 
         timeTracker.removeElement(elem);
 
         // only advance the partition timestamp if its currently
         // tracked min timestamp has exceeded its value
-        long timestamp = timeTracker.get();
+        final long timestamp = timeTracker.get();
 
-        if (timestamp > partitionTime)
+        if (timestamp > partitionTime) {
             partitionTime = timestamp;
+        }
 
         return elem;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index 1d9e722..7fde881 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -63,12 +63,12 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
     }
 
     public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext,
-                                                         ConsumerRecord<byte[], byte[]> rawRecord) {
+                                                         final ConsumerRecord<byte[], byte[]> rawRecord) {
 
         // catch and process if we have a deserialization handler
         try {
             return deserialize(rawRecord);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             final DeserializationExceptionHandler.DeserializationHandlerResponse response =
                     deserializationExceptionHandler.handle(processorContext, rawRecord, e);
             if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 033af24..98ec810 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -63,6 +63,15 @@ public class StandbyTask extends AbstractTask {
         processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
     }
 
+    @Override
+    public boolean initialize() {
+        initializeStateStores();
+        checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
+        processorContext.initialized();
+        taskInitialized = true;
+        return true;
+    }
+
     /**
      * <pre>
      * - update offset limits
@@ -139,16 +148,6 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public boolean maybePunctuateStreamTime() {
-        throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
-    }
-
-    @Override
-    public boolean maybePunctuateSystemTime() {
-        throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
-    }
-
-    @Override
     public boolean commitNeeded() {
         return false;
     }
@@ -174,16 +173,18 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public boolean process() {
-        throw new UnsupportedOperationException("process not supported by StandbyTasks");
+    public boolean maybePunctuateStreamTime() {
+        throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
     }
 
-    public boolean initialize() {
-        initializeStateStores();
-        checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
-        processorContext.initialized();
-        taskInitialized = true;
-        return true;
+    @Override
+    public boolean maybePunctuateSystemTime() {
+        throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
+    }
+
+    @Override
+    public boolean process() {
+        throw new UnsupportedOperationException("process not supported by StandbyTasks");
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 8ecc7e2..caa0100 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -185,7 +185,9 @@ public class StoreChangelogReader implements ChangelogReader {
         needsRestoring.putAll(initialized);
     }
 
-    private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) {
+    private void logRestoreOffsets(final TopicPartition partition,
+                                   final long startingOffset,
+                                   final Long endOffset) {
         log.debug("Restoring partition {} from offset {} to endOffset {}",
                   partition,
                   startingOffset,
@@ -229,7 +231,7 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
-                                    final TopicPartition topicPartition) {
+                                  final TopicPartition topicPartition) {
         final StateRestorer restorer = stateRestorers.get(topicPartition);
         final Long endOffset = endOffsets.get(topicPartition);
         final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
@@ -255,7 +257,8 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
-                             final StateRestorer restorer, final Long endOffset) {
+                             final StateRestorer restorer,
+                             final Long endOffset) {
         final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
         long nextPosition = -1;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 084a991..0830aa2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -131,7 +131,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
 
-        final TimestampExtractor defaultTimestampExtractor  = config.defaultTimestampExtractor();
+        final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
         final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
         for (final TopicPartition partition : partitions) {
             final SourceNode source = topology.source(partition.topic());
@@ -151,6 +151,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
+    public boolean initialize() {
+        log.debug("Initializing");
+        initializeStateStores();
+        initTopology();
+        processorContext.initialized();
+        taskInitialized = true;
+        return topology.stateStores().isEmpty();
+    }
+
+
     /**
      * <pre>
      * - re-initialize the task
@@ -597,13 +607,4 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         return new RecordCollectorImpl(producer, id.toString(), logContext);
     }
 
-    public boolean initialize() {
-        log.debug("Initializing");
-        initializeStateStores();
-        initTopology();
-        processorContext.initialized();
-        taskInitialized = true;
-        return topology.stateStores().isEmpty();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 867359b..e141c46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -421,7 +421,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             }
 
             return threadProducer;
-
         }
 
         @Override
@@ -456,20 +455,29 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
 
         @Override
-        StandbyTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
+        StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
+                               final TaskId taskId,
+                               final Set<TopicPartition> partitions) {
             taskCreatedSensor.record();
 
             final ProcessorTopology topology = builder.build(taskId.topicGroupId);
 
             if (!topology.stateStores().isEmpty()) {
-                return new StandbyTask(taskId, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
+                return new StandbyTask(taskId,
+                                       applicationId,
+                                       partitions,
+                                       topology,
+                                       consumer,
+                                       storeChangelogReader,
+                                       config,
+                                       streamsMetrics,
+                                       stateDirectory);
             } else {
-                log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", taskId, partitions);
-
+                log.trace("Skipped standby task {} with assigned partitions {} " +
+                    "since it does not have any state stores to materialize", taskId, partitions);
                 return null;
             }
         }
-
     }
 
     /**
@@ -781,8 +789,10 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                 final long processLatency = computeLatency();
                 streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
                                                         timerStartedMs);
-                processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,
-                                                                                  processLatency, commitTimeMs);
+                processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit,
+                                                                           totalProcessed,
+                                                                           processLatency,
+                                                                           commitTimeMs);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7afbecf..f12ed91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -110,7 +110,7 @@ class TaskManager {
                         newTasks.put(taskId, partitions);
                     }
                 } catch (final StreamsException e) {
-                    log.error("Failed to create an active task {} due to the following error:", taskId, e);
+                    log.error("Failed to resume an active task {} due to the following error:", taskId, e);
                     throw e;
                 }
             } else {
@@ -122,6 +122,7 @@ class TaskManager {
             return;
         }
 
+        // CANNOT FIND RETRY AND BACKOFF LOGIC
         // create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
         // -> other thread will call removeSuspendedTasks(); eventually
         log.trace("New active tasks to be created: {}", newTasks);
@@ -185,24 +186,13 @@ class TaskManager {
         firstException.compareAndSet(null, active.suspend());
         firstException.compareAndSet(null, standby.suspend());
         // remove the changelog partitions from restore consumer
-        firstException.compareAndSet(null, unAssignChangeLogPartitions());
+        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
 
         if (firstException.get() != null) {
             throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get());
         }
     }
 
-    private RuntimeException unAssignChangeLogPartitions() {
-        try {
-            // un-assign the change log partitions
-            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-        } catch (final RuntimeException e) {
-            log.error("Failed to un-assign change log partitions due to the following error:", e);
-            return e;
-        }
-        return null;
-    }
-
     void shutdown(final boolean clean) {
         log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
                   active.previousTaskIds(), standby.previousTaskIds());
@@ -215,9 +205,9 @@ class TaskManager {
             log.error("Failed to close KafkaStreamClient due to the following error:", e);
         }
         // remove the changelog partitions from restore consumer
-        unAssignChangeLogPartitions();
+        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
         taskCreator.close();
-
+        standbyTaskCreator.close();
     }
 
     Set<TaskId> suspendedActiveTaskIds() {