You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/28 10:00:35 UTC

[2/2] kafka git commit: KAFKA-5949; User Callback Exceptions need to be handled properly

KAFKA-5949; User Callback Exceptions need to be handled properly

 - catch user exception in user callback (TimestampExtractor, DeserializationHandler, StateRestoreListener) and wrap with StreamsException

Additional cleanup:
 - rename globalRestoreListener to userRestoreListener
 - remove unnecessary interface -> collapse SourceNodeRecordDeserializer and RecordDeserializer
 - removed unused parameter loggingEnabled from ProcessorContext#register

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #3939 from mjsax/kafka-5949-exceptions-user-callbacks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5f2471c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5f2471c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5f2471c

Branch: refs/heads/trunk
Commit: e5f2471c548fc490a42dd0321bcf7fcdd4ddc52d
Parents: 2703fda
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Sep 28 11:00:31 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Sep 28 11:00:31 2017 +0100

----------------------------------------------------------------------
 .../errors/LogAndContinueExceptionHandler.java  |   6 +-
 .../errors/LogAndFailExceptionHandler.java      |   8 +-
 .../kafka/streams/kstream/ValueTransformer.java |   2 +
 .../internals/KStreamTransformValues.java       |   6 +-
 .../streams/processor/ProcessorContext.java     |  11 +-
 .../kafka/streams/processor/StateStore.java     |   4 +
 .../internals/AbstractProcessorContext.java     |   4 +-
 .../processor/internals/AbstractTask.java       |   6 +-
 .../processor/internals/AssignedTasks.java      |   5 +
 .../internals/CompositeRestoreListener.java     |  52 +++++++--
 .../processor/internals/GlobalStateManager.java |   6 +
 .../internals/GlobalStateManagerImpl.java       |   1 -
 .../internals/GlobalStateUpdateTask.java        |  21 ++--
 .../processor/internals/GlobalStreamThread.java |   7 +-
 .../internals/InternalTopologyBuilder.java      |   8 +-
 .../internals/ProcessorStateManager.java        |  13 +--
 .../processor/internals/RecordDeserializer.java |  70 +++++++++++-
 .../processor/internals/RecordQueue.java        |  28 +++--
 .../internals/SourceNodeRecordDeserializer.java |  90 ---------------
 .../processor/internals/StateManager.java       |   8 +-
 .../processor/internals/StateRestorer.java      |   4 +-
 .../internals/StoreChangelogReader.java         |   8 +-
 .../streams/processor/internals/StreamTask.java |   3 +-
 .../processor/internals/StreamThread.java       |  12 +-
 .../kafka/streams/processor/internals/Task.java |   3 +
 .../processor/internals/TaskManager.java        |   5 +-
 .../state/internals/InMemoryKeyValueStore.java  |   2 +-
 .../streams/state/internals/MemoryLRUCache.java |   2 +-
 .../streams/processor/TopologyBuilderTest.java  |   2 +-
 .../internals/CompositeRestoreListenerTest.java |   8 +-
 .../internals/GlobalStateManagerImplTest.java   |  42 +++----
 .../internals/GlobalStateTaskTest.java          |  20 +++-
 .../internals/InternalTopologyBuilderTest.java  |   3 +
 .../processor/internals/PartitionGroupTest.java |  18 ++-
 .../internals/ProcessorStateManagerTest.java    |  50 ++++-----
 .../internals/RecordDeserializerTest.java       |  98 ++++++++++++++++
 .../processor/internals/RecordQueueTest.java    |  32 ++++--
 .../SourceNodeRecordDeserializerTest.java       | 111 -------------------
 .../processor/internals/StandbyTaskTest.java    |   2 +-
 .../processor/internals/StateManagerStub.java   |   2 +-
 .../processor/internals/StateRestorerTest.java  |   2 +-
 .../internals/StoreChangelogReaderTest.java     |   2 +-
 .../internals/StreamPartitionAssignorTest.java  |  15 ++-
 .../processor/internals/StreamTaskTest.java     |   4 +-
 .../kafka/test/GlobalStateManagerStub.java      |   2 +-
 .../apache/kafka/test/MockProcessorContext.java |   4 +-
 .../kafka/test/MockStateStoreSupplier.java      |  39 +++----
 .../apache/kafka/test/NoOpProcessorContext.java |   4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   5 +-
 49 files changed, 478 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index dde4b52..b2ef45b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -38,9 +38,9 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
                                                  final Exception exception) {
 
         log.warn("Exception caught during Deserialization, " +
-                        "taskId: {}, topic: {}, partition: {}, offset: {}",
-                context.taskId(), record.topic(), record.partition(), record.offset(),
-                exception);
+                 "taskId: {}, topic: {}, partition: {}, offset: {}",
+                 context.taskId(), record.topic(), record.partition(), record.offset(),
+                 exception);
 
         return DeserializationHandlerResponse.CONTINUE;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 23557a3..60af32f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -37,10 +37,10 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
                                                  final ConsumerRecord<byte[], byte[]> record,
                                                  final Exception exception) {
 
-        log.warn("Exception caught during Deserialization, " +
-                        "taskId: {}, topic: {}, partition: {}, offset: {}",
-                context.taskId(), record.topic(), record.partition(), record.offset(),
-                exception);
+        log.error("Exception caught during Deserialization, " +
+                  "taskId: {}, topic: {}, partition: {}, offset: {}",
+                  context.taskId(), record.topic(), record.partition(), record.offset(),
+                  exception);
 
         return DeserializationHandlerResponse.FAIL;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 5463a76..0a8e890 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -62,6 +62,8 @@ public interface ValueTransformer<V, VR> {
      * {@code ValueTransformer} and will result in an {@link StreamsException exception}.
      *
      * @param context the context
+     * @throws IllegalStateException If store gets registered after initialization is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     void init(final ProcessorContext context);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index ab1c302..55c16cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -91,8 +91,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
                     }
 
                     @Override
-                    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
-                        context.register(store, loggingEnabled, stateRestoreCallback);
+                    public void register(final StateStore store,
+                                         final boolean deprecatedAndIgnoredLoggingEnabled,
+                                         final StateRestoreCallback stateRestoreCallback) {
+                        context.register(store, deprecatedAndIgnoredLoggingEnabled, stateRestoreCallback);
                     }
 
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index cdf1612..385d641 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.File;
 import java.util.Map;
@@ -75,8 +76,14 @@ public interface ProcessorContext {
      * Registers and possibly restores the specified storage engine.
      *
      * @param store the storage engine
-     */
-    void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
+     * @param loggingEnabledIsDeprecatedAndIgnored deprecated parameter {@code loggingEnabled} is ignored:
+     *                                             if you want to enable logging on a state stores call
+     *                                             {@link org.apache.kafka.streams.state.StoreBuilder#withLoggingEnabled(Map)}
+     *                                             when creating the store
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
 
     /**
      * Get the state store given the store name.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 3925951..cb8139c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.errors.StreamsException;
+
 /**
  * A storage engine for managing state maintained by a stream processor.
  *
@@ -36,6 +38,8 @@ public interface StateStore {
 
     /**
      * Initializes this state store
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     void init(ProcessorContext context, StateStore root);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 9e853fd..410212e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -93,13 +93,13 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
 
     @Override
     public void register(final StateStore store,
-                         final boolean loggingEnabled,
+                         final boolean deprecatedAndIgnoredLoggingEnabled,
                          final StateRestoreCallback stateRestoreCallback) {
         if (initialized) {
             throw new IllegalStateException("Can only create state stores during initialization.");
         }
         Objects.requireNonNull(store, "store must not be null");
-        stateManager.register(store, loggingEnabled, stateRestoreCallback);
+        stateManager.register(store, stateRestoreCallback);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 6734da6..c24686e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -90,7 +90,7 @@ public abstract class AbstractTask implements Task {
                 topology.storeToChangelogTopic(),
                 changelogReader,
                 eosEnabled,
-                    logContext);
+                logContext);
         } catch (final IOException e) {
             throw new ProcessorStateException(String.format("%sError while creating the state manager", logPrefix), e);
         }
@@ -196,6 +196,10 @@ public abstract class AbstractTask implements Task {
         stateMgr.flush();
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     void initializeStateStores() {
         if (topology.stateStores().isEmpty()) {
             return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index e51ebd7..fcb717d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
@@ -107,6 +108,10 @@ class AssignedTasks {
         return partitions;
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     void initializeNewTasks() {
         if (!created.isEmpty()) {
             log.debug("Initializing {}s {}", taskTypeName, created.keySet());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
index 138be77..a1c2f7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -32,7 +33,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
     public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener();
     private final BatchingStateRestoreCallback internalBatchingRestoreCallback;
     private final StateRestoreListener storeRestoreListener;
-    private StateRestoreListener globalRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
+    private StateRestoreListener userRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
 
     CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
 
@@ -45,31 +46,66 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
         internalBatchingRestoreCallback = getBatchingRestoreCallback(stateRestoreCallback);
     }
 
+    /**
+     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+     * {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)}
+     */
     @Override
     public void onRestoreStart(final TopicPartition topicPartition,
                                final String storeName,
                                final long startingOffset,
                                final long endingOffset) {
-        globalRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+        try {
+            userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+        } catch (final Exception fatalUserException) {
+            throw new StreamsException(
+                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                              storeName,
+                              topicPartition),
+                fatalUserException);
+        }
         storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
     }
 
+    /**
+     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+     * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}
+     */
     @Override
     public void onBatchRestored(final TopicPartition topicPartition,
                                 final String storeName,
                                 final long batchEndOffset,
                                 final long numRestored) {
-        globalRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+        try {
+            userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+        } catch (final Exception fatalUserException) {
+            throw new StreamsException(
+                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                    storeName,
+                    topicPartition),
+                fatalUserException);
+        }
         storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
     }
 
+    /**
+     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+     * {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}
+     */
     @Override
     public void onRestoreEnd(final TopicPartition topicPartition,
                              final String storeName,
                              final long totalRestored) {
-        globalRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+        try {
+            userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+        } catch (final Exception fatalUserException) {
+            throw new StreamsException(
+                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                    storeName,
+                    topicPartition),
+                fatalUserException);
+        }
         storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
-
     }
 
     @Override
@@ -77,9 +113,9 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
         internalBatchingRestoreCallback.restoreAll(records);
     }
 
-    void setGlobalRestoreListener(final StateRestoreListener globalRestoreListener) {
-        if (globalRestoreListener != null) {
-            this.globalRestoreListener = globalRestoreListener;
+    void setUserRestoreListener(final StateRestoreListener userRestoreListener) {
+        if (userRestoreListener != null) {
+            this.userRestoreListener = userRestoreListener;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
index b058844..c9b8ca8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
@@ -16,8 +16,14 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
+
 import java.util.Set;
 
 public interface GlobalStateManager extends StateManager {
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     Set<String> initialize(InternalProcessorContext processorContext);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index d03425b..10a0775 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -120,7 +120,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     }
 
     public void register(final StateStore store,
-                         final boolean ignored,
                          final StateRestoreCallback stateRestoreCallback) {
 
         if (stores.containsKey(store.name())) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 4c2b40f..849af57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -18,7 +18,9 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -33,29 +35,34 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     private final ProcessorTopology topology;
     private final InternalProcessorContext processorContext;
     private final Map<TopicPartition, Long> offsets = new HashMap<>();
-    private final Map<String, SourceNodeRecordDeserializer> deserializers = new HashMap<>();
+    private final Map<String, RecordDeserializer> deserializers = new HashMap<>();
     private final GlobalStateManager stateMgr;
     private final DeserializationExceptionHandler deserializationExceptionHandler;
-
+    private final LogContext logContext;
 
     public GlobalStateUpdateTask(final ProcessorTopology topology,
                                  final InternalProcessorContext processorContext,
                                  final GlobalStateManager stateMgr,
-                                 final DeserializationExceptionHandler deserializationExceptionHandler) {
-
+                                 final DeserializationExceptionHandler deserializationExceptionHandler,
+                                 final LogContext logContext) {
         this.topology = topology;
         this.stateMgr = stateMgr;
         this.processorContext = processorContext;
         this.deserializationExceptionHandler = deserializationExceptionHandler;
+        this.logContext = logContext;
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     public Map<TopicPartition, Long> initialize() {
         final Set<String> storeNames = stateMgr.initialize(processorContext);
         final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
             final SourceNode source = topology.source(sourceTopic);
-            deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler));
+            deserializers.put(sourceTopic, new RecordDeserializer(source, deserializationExceptionHandler, logContext));
         }
         initTopology();
         processorContext.initialized();
@@ -66,8 +73,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     @SuppressWarnings("unchecked")
     @Override
     public void update(final ConsumerRecord<byte[], byte[]> record) {
-        final SourceNodeRecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
-        final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.tryDeserialize(processorContext, record);
+        final RecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
+        final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record);
 
         if (deserialized != null) {
             final ProcessorRecordContext recordContext =

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index a365add..1ee49e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -220,6 +220,10 @@ public class GlobalStreamThread extends Thread {
             this.flushInterval = flushInterval;
         }
 
+        /**
+         * @throws IllegalStateException If store gets registered after initialized is already finished
+         * @throws StreamsException if the store's change log does not contain the partition
+         */
         void initialize() {
             final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize();
             consumer.assign(partitionOffsets.keySet());
@@ -312,7 +316,8 @@ public class GlobalStreamThread extends Thread {
                                                                           streamsMetrics,
                                                                           cache),
                                                                   stateMgr,
-                                                                  config.defaultDeserializationExceptionHandler()),
+                                                                  config.defaultDeserializationExceptionHandler(),
+                                                                  logContext),
                                         time,
                                         config.getLong(StreamsConfig.POLL_MS_CONFIG),
                                         config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 81d2f6c..06405ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -178,8 +178,10 @@ public class InternalTopologyBuilder {
     }
 
     private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
+        @SuppressWarnings("deprecation")
         private final StateStoreSupplier supplier;
 
+        @SuppressWarnings("deprecation")
         StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
             super(supplier.name(),
                   supplier.loggingEnabled(),
@@ -495,6 +497,7 @@ public class InternalTopologyBuilder {
         nodeGrouper.unite(name, predecessorNames);
     }
 
+    @SuppressWarnings("deprecation")
     public final void addStateStore(final StateStoreSupplier supplier,
                                     final String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
@@ -527,6 +530,7 @@ public class InternalTopologyBuilder {
         }
     }
 
+    @SuppressWarnings("deprecation")
     public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
@@ -1612,8 +1616,8 @@ public class InternalTopologyBuilder {
             return Collections.unmodifiableSet(nodes);
         }
 
-        // only for testing
-        public Iterator<TopologyDescription.Node> nodesInOrder() {
+        // visible for testing
+        Iterator<TopologyDescription.Node> nodesInOrder() {
             return nodes.iterator();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2f16547..cc14c67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -41,8 +40,7 @@ import java.util.Map;
 
 public class ProcessorStateManager implements StateManager {
 
-
-    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
+    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
     private final Logger log;
@@ -119,17 +117,8 @@ public class ProcessorStateManager implements StateManager {
         return baseDir;
     }
 
-    /**
-     * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
-     * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
-     *
-     * // TODO: parameter loggingEnabled can be removed now
-     *
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
     @Override
     public void register(final StateStore store,
-                         final boolean loggingEnabled,
                          final StateRestoreCallback stateRestoreCallback) {
         log.debug("Registering state store {} to its state manager", store.name());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 4e04108..1b5f764 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -17,7 +17,73 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.slf4j.Logger;
 
-interface RecordDeserializer {
-    ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord);
+import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+
+class RecordDeserializer {
+    private final SourceNode sourceNode;
+    private final DeserializationExceptionHandler deserializationExceptionHandler;
+    private final Logger log;
+
+    RecordDeserializer(final SourceNode sourceNode,
+                       final DeserializationExceptionHandler deserializationExceptionHandler,
+                       final LogContext logContext) {
+        this.sourceNode = sourceNode;
+        this.deserializationExceptionHandler = deserializationExceptionHandler;
+        this.log = logContext.logger(RecordDeserializer.class);
+    }
+
+    /**
+     * @throws StreamsException if a deserialization error occurs and the deserialization callback returns
+     * {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL} or throws an exception itself
+     */
+    @SuppressWarnings("deprecation")
+    ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext,
+                                               final ConsumerRecord<byte[], byte[]> rawRecord) {
+
+        try {
+            return new ConsumerRecord<>(
+                rawRecord.topic(),
+                rawRecord.partition(),
+                rawRecord.offset(),
+                rawRecord.timestamp(),
+                TimestampType.CREATE_TIME,
+                rawRecord.checksum(),
+                rawRecord.serializedKeySize(),
+                rawRecord.serializedValueSize(),
+                sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
+                sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()));
+        } catch (final Exception deserializationException) {
+            final DeserializationExceptionHandler.DeserializationHandlerResponse response;
+            try {
+                response = deserializationExceptionHandler.handle(processorContext, rawRecord, deserializationException);
+            } catch (final Exception fatalUserException) {
+                log.error("Deserialization error callback failed after deserialization error for record {}",
+                          rawRecord,
+                          deserializationException);
+                throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
+            }
+
+            if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
+                throw new StreamsException("Deserialization exception handler is set to fail upon" +
+                    " a deserialization error. If you would rather have the streaming pipeline" +
+                    " continue after a deserialization error, please set the " +
+                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
+                    deserializationException);
+            } else {
+                sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
+            }
+        }
+        return null;
+    }
+
+    SourceNode sourceNode() {
+        return sourceNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 889b6d8..e6facaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -18,11 +18,12 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 
@@ -33,16 +34,14 @@ import java.util.ArrayDeque;
  * timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
  */
 public class RecordQueue {
-
-    private static final Logger log = LoggerFactory.getLogger(RecordQueue.class);
-
     private final SourceNode source;
     private final TimestampExtractor timestampExtractor;
     private final TopicPartition partition;
     private final ArrayDeque<StampedRecord> fifoQueue;
     private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
-    private final SourceNodeRecordDeserializer recordDeserializer;
+    private final RecordDeserializer recordDeserializer;
     private final ProcessorContext processorContext;
+    private final Logger log;
 
     private long partitionTime = TimestampTracker.NOT_KNOWN;
 
@@ -50,14 +49,16 @@ public class RecordQueue {
                 final SourceNode source,
                 final TimestampExtractor timestampExtractor,
                 final DeserializationExceptionHandler deserializationExceptionHandler,
-                final ProcessorContext processorContext) {
+                final ProcessorContext processorContext,
+                final LogContext logContext) {
         this.partition = partition;
         this.source = source;
         this.timestampExtractor = timestampExtractor;
         this.fifoQueue = new ArrayDeque<>();
         this.timeTracker = new MinTimestampTracker<>();
-        this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler);
+        this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext);
         this.processorContext = processorContext;
+        this.log = logContext.logger(RecordQueue.class);
     }
 
     /**
@@ -87,12 +88,21 @@ public class RecordQueue {
     int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
         for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
 
-            final ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
+            final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord);
             if (record == null) {
                 continue;
             }
 
-            final long timestamp = timestampExtractor.extract(record, timeTracker.get());
+            final long timestamp;
+            try {
+                timestamp = timestampExtractor.extract(record, timeTracker.get());
+            } catch (final StreamsException internalFatalExtractorException) {
+                throw internalFatalExtractorException;
+            } catch (final Exception fatalUserException) {
+                throw new StreamsException(
+                    String.format("Fatal user code error in TimestampExtractor callback for record %s.", record),
+                    fatalUserException);
+            }
             log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);
 
             // drop message if TS is invalid, i.e., negative

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
deleted file mode 100644
index 7fde881..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import static java.lang.String.format;
-import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
-
-class SourceNodeRecordDeserializer implements RecordDeserializer {
-    private final SourceNode sourceNode;
-    private final DeserializationExceptionHandler deserializationExceptionHandler;
-
-    SourceNodeRecordDeserializer(final SourceNode sourceNode,
-                                 final DeserializationExceptionHandler deserializationExceptionHandler) {
-        this.sourceNode = sourceNode;
-        this.deserializationExceptionHandler = deserializationExceptionHandler;
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
-        final Object key;
-        try {
-            key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key());
-        } catch (Exception e) {
-            throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d",
-                                              rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
-        }
-
-        final Object value;
-        try {
-            value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value());
-        } catch (Exception e) {
-            throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d",
-                                              rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
-        }
-
-        return new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
-                                    rawRecord.timestamp(), TimestampType.CREATE_TIME,
-                                    rawRecord.checksum(),
-                                    rawRecord.serializedKeySize(),
-                                    rawRecord.serializedValueSize(), key, value);
-
-    }
-
-    public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext,
-                                                         final ConsumerRecord<byte[], byte[]> rawRecord) {
-
-        // catch and process if we have a deserialization handler
-        try {
-            return deserialize(rawRecord);
-        } catch (final Exception e) {
-            final DeserializationExceptionHandler.DeserializationHandlerResponse response =
-                    deserializationExceptionHandler.handle(processorContext, rawRecord, e);
-            if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
-                throw new StreamsException("Deserialization exception handler is set to fail upon" +
-                        " a deserialization error. If you would rather have the streaming pipeline" +
-                        " continue after a deserialization error, please set the " +
-                        DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
-                        e);
-            } else {
-                sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
-            }
-        }
-        return null;
-    }
-
-    public SourceNode sourceNode() {
-        return sourceNode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 511280d..2a8d9a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 
@@ -27,7 +28,12 @@ import java.util.Map;
 interface StateManager extends Checkpointable {
     File baseDir();
 
-    void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback);
+    /**
+     * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
+     * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void register(final StateStore store, final StateRestoreCallback stateRestoreCallback);
 
     void flush();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 579561f..33dce9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -79,8 +79,8 @@ public class StateRestorer {
         return persistent;
     }
 
-    void setGlobalRestoreListener(StateRestoreListener globalStateRestoreListener) {
-        this.compositeRestoreListener.setGlobalRestoreListener(globalStateRestoreListener);
+    void setUserRestoreListener(StateRestoreListener userRestoreListener) {
+        this.compositeRestoreListener.setUserRestoreListener(userRestoreListener);
     }
 
     void setRestoredOffset(final long restoredOffset) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index caa0100..4ba860d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -41,7 +41,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
     private final Logger log;
     private final Consumer<byte[], byte[]> consumer;
-    private final StateRestoreListener stateRestoreListener;
+    private final StateRestoreListener userStateRestoreListener;
     private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
@@ -50,11 +50,11 @@ public class StoreChangelogReader implements ChangelogReader {
 
     public StoreChangelogReader(final String threadId,
                                 final Consumer<byte[], byte[]> consumer,
-                                final StateRestoreListener stateRestoreListener,
+                                final StateRestoreListener userStateRestoreListener,
                                 final LogContext logContext) {
         this.consumer = consumer;
         this.log = logContext.logger(getClass());
-        this.stateRestoreListener = stateRestoreListener;
+        this.userStateRestoreListener = userStateRestoreListener;
     }
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
@@ -65,7 +65,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        restorer.setGlobalRestoreListener(stateRestoreListener);
+        restorer.setUserRestoreListener(userStateRestoreListener);
         stateRestorers.put(restorer.partition(), restorer);
         needsInitializing.put(restorer.partition(), restorer);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6775edb..8c26fa9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -136,7 +136,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         for (final TopicPartition partition : partitions) {
             final SourceNode source = topology.source(partition.topic());
             final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
-            final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext);
+            final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext, logContext);
             partitionQueues.put(partition, queue);
         }
 
@@ -151,6 +151,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
+    @Override
     public boolean initialize() {
         log.trace("Initializing");
         initializeStateStores();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e141c46..8d13558 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -641,7 +641,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                       final StreamsMetadataState streamsMetadataState,
                                       final long cacheSizeBytes,
                                       final StateDirectory stateDirectory,
-                                      final StateRestoreListener stateRestoreListener) {
+                                      final StateRestoreListener userStateRestoreListener) {
 
         final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
         final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics,
@@ -666,8 +666,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
         final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId,
                                                                               restoreConsumer,
-                                                                              stateRestoreListener,
-                                                                                logContext);
+                                                                              userStateRestoreListener,
+                                                                              logContext);
 
         Producer<byte[], byte[]> threadProducer = null;
         if (!eosEnabled) {
@@ -757,6 +757,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
     /**
      * Main event loop for polling, and processing records through topologies.
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     private void runLoop() {
         long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
@@ -767,6 +769,10 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     // Visible for testing
     long runOnce(final long recordsProcessedBeforeCommit) {
         long processedBeforeCommit = recordsProcessedBeforeCommit;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index a481c31..80e5423 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -71,6 +72,8 @@ public interface Task {
     /**
      * initialize the task and return true if the task is ready to run, i.e, it has not state stores
      * @return true if this task has no state stores that may need restoring.
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     boolean initialize();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f12ed91..278957e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -239,7 +239,10 @@ class TaskManager {
         this.consumer = consumer;
     }
 
-
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     boolean updateNewAndRestoringTasks() {
         active.initializeNewTasks();
         standby.initializeNewTasks();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 7e24969..929d584 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -72,7 +72,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
         if (root != null) {
             // register the store
-            context.register(root, true, new StateRestoreCallback() {
+            context.register(root, false, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
                     // this is a delete

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index beb9ce1..f1aa96f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -108,7 +108,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
             valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         // register the store
-        context.register(root, true, new StateRestoreCallback() {
+        context.register(root, false, new StateRestoreCallback() {
             @Override
             public void restore(byte[] key, byte[] value) {
                 restoring = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index ded2732..7786348 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -121,7 +121,7 @@ public class TopologyBuilderTest {
         final Serde<String> stringSerde = Serdes.String();
 
         try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), new String[]{});
+            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
             fail("Should throw TopologyBuilderException with no topics");
         } catch (TopologyBuilderException tpe) {
             //no-op

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index 88aba94..e2ea668 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -124,7 +124,7 @@ public class CompositeRestoreListenerTest {
     @Test
     public void shouldHandleNullReportStoreListener() {
         compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(null);
+        compositeRestoreListener.setUserRestoreListener(null);
 
         compositeRestoreListener.restoreAll(records);
         compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
@@ -138,7 +138,7 @@ public class CompositeRestoreListenerTest {
     @Test
     public void shouldHandleNoRestoreListener() {
         compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(null);
+        compositeRestoreListener.setUserRestoreListener(null);
 
         compositeRestoreListener.restoreAll(records);
         compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
@@ -151,7 +151,7 @@ public class CompositeRestoreListenerTest {
     @Test(expected = UnsupportedOperationException.class)
     public void shouldThrowExceptionWhenSinglePutDirectlyCalled() {
         compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(null);
+        compositeRestoreListener.setUserRestoreListener(null);
 
         compositeRestoreListener.restore(key, value);
     }
@@ -179,7 +179,7 @@ public class CompositeRestoreListenerTest {
 
     private void setUpCompositeRestoreListener(StateRestoreCallback stateRestoreCallback) {
         compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(reportingStoreListener);
+        compositeRestoreListener.setUserRestoreListener(reportingStoreListener);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index b438347..0519fb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -166,7 +166,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
 
         try {
-            stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), false, new TheStateRestoreCallback());
+            stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), new TheStateRestoreCallback());
             fail("should have raised an illegal argument exception as store is not in the topology");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -177,9 +177,9 @@ public class GlobalStateManagerImplTest {
     public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
         stateManager.initialize(context);
         initializeConsumer(2, 1, t1);
-        stateManager.register(store1, false, new TheStateRestoreCallback());
+        stateManager.register(store1, new TheStateRestoreCallback());
         try {
-            stateManager.register(store1, false, new TheStateRestoreCallback());
+            stateManager.register(store1, new TheStateRestoreCallback());
             fail("should have raised an illegal argument exception as store has already been registered");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -190,7 +190,7 @@ public class GlobalStateManagerImplTest {
     public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
         stateManager.initialize(context);
         try {
-            stateManager.register(store1, false, new TheStateRestoreCallback());
+            stateManager.register(store1, new TheStateRestoreCallback());
             fail("Should have raised a StreamsException as there are no partition for the store");
         } catch (final StreamsException e) {
             // pass
@@ -204,7 +204,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
 
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         assertEquals(2, stateRestoreCallback.restored.size());
     }
 
@@ -236,7 +236,7 @@ public class GlobalStateManagerImplTest {
 
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1,  stateRestoreCallback);
         assertEquals(5, stateRestoreCallback.restored.size());
     }
 
@@ -247,9 +247,9 @@ public class GlobalStateManagerImplTest {
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         // register the stores
         initializeConsumer(1, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         initializeConsumer(1, 1, t2);
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         stateManager.flush();
         assertTrue(store1.flushed);
@@ -267,7 +267,7 @@ public class GlobalStateManagerImplTest {
             public void flush() {
                 throw new RuntimeException("KABOOM!");
             }
-        }, false, stateRestoreCallback);
+        }, stateRestoreCallback);
 
         stateManager.flush();
     }
@@ -278,9 +278,9 @@ public class GlobalStateManagerImplTest {
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         // register the stores
         initializeConsumer(1, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         initializeConsumer(1, 1, t2);
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
         assertFalse(store1.isOpen());
@@ -292,7 +292,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(1, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
         stateManager.close(expected);
         final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
@@ -308,7 +308,7 @@ public class GlobalStateManagerImplTest {
             public void close() {
                 throw new RuntimeException("KABOOM!");
             }
-        }, false, stateRestoreCallback);
+        }, stateRestoreCallback);
 
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
     }
@@ -317,7 +317,7 @@ public class GlobalStateManagerImplTest {
     public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
         stateManager.initialize(context);
         try {
-            stateManager.register(store1, false, null);
+            stateManager.register(store1, null);
             fail("should have thrown due to null callback");
         } catch (IllegalArgumentException e) {
             //pass
@@ -349,7 +349,7 @@ public class GlobalStateManagerImplTest {
                 }
                 super.close();
             }
-        }, false, stateRestoreCallback);
+        }, stateRestoreCallback);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
 
 
@@ -368,9 +368,9 @@ public class GlobalStateManagerImplTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        stateManager.register(store, false, stateRestoreCallback);
+        stateManager.register(store, stateRestoreCallback);
 
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         try {
             stateManager.close(Collections.<TopicPartition, Long>emptyMap());
@@ -415,9 +415,9 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(10, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         initializeConsumer(20, 1, t2);
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed();
         stateManager.checkpoint(Collections.singletonMap(t1, 101L));
@@ -444,7 +444,7 @@ public class GlobalStateManagerImplTest {
 
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         final KeyValue<byte[], byte[]> restoredKv = stateRestoreCallback.restored.get(0);
         assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value))));
     }
@@ -454,7 +454,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(10, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
 
         final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 4ece443..63783a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
@@ -51,6 +52,7 @@ import static org.junit.Assert.fail;
 
 public class GlobalStateTaskTest {
 
+    private final LogContext logContext = new LogContext();
     private Map<TopicPartition, Long> offsets;
     private GlobalStateUpdateTask globalStateTask;
     private GlobalStateManagerStub stateMgr;
@@ -92,7 +94,7 @@ public class GlobalStateTaskTest {
         offsets.put(t1, 50L);
         offsets.put(t2, 100L);
         stateMgr = new GlobalStateManagerStub(storeNames, offsets);
-        globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler());
+        globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler(), logContext);
     }
 
     @Test
@@ -175,8 +177,12 @@ public class GlobalStateTaskTest {
 
     @Test
     public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception {
-        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
-            new LogAndContinueExceptionHandler());
+        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(
+            topology,
+            context,
+            stateMgr,
+            new LogAndContinueExceptionHandler(),
+            logContext);
         final byte[] key = new LongSerializer().serialize("t2", 1L);
         final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
 
@@ -185,8 +191,12 @@ public class GlobalStateTaskTest {
 
     @Test
     public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
-        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
-            new LogAndContinueExceptionHandler());
+        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(
+            topology,
+            context,
+            stateMgr,
+            new LogAndContinueExceptionHandler(),
+            logContext);
         final byte[] key = new IntegerSerializer().serialize("t2", 1);
         final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 95636ec..e223699 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -295,6 +295,7 @@ public class InternalTopologyBuilderTest {
         } catch (final TopologyException expected) { /* ok */ }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testAddStateStore() {
         final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
@@ -344,6 +345,7 @@ public class InternalTopologyBuilderTest {
         assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testTopicGroupsByStateStore() {
         builder.setApplicationId("X");
@@ -470,6 +472,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId(null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() throws Exception {
         builder.addStateStore((StateStoreSupplier) null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index d9f38eb..00a2c35 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.test.MockSourceNode;
@@ -35,14 +36,27 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 
 public class PartitionGroupTest {
+    private final LogContext logContext = new LogContext();
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
     private final String[] topics = {"topic"};
     private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
     private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
-    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
-    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
+    private final RecordQueue queue1 = new RecordQueue(
+        partition1,
+        new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+        timestampExtractor,
+        new LogAndContinueExceptionHandler(),
+        null,
+        logContext);
+    private final RecordQueue queue2 = new RecordQueue(
+        partition2,
+        new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+        timestampExtractor,
+        new LogAndContinueExceptionHandler(),
+        null,
+        logContext);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);