You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/28 10:00:34 UTC

[1/2] kafka git commit: KAFKA-5949; User Callback Exceptions need to be handled properly

Repository: kafka
Updated Branches:
  refs/heads/trunk 2703fda52 -> e5f2471c5


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index f3135d5..ede6dd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -105,7 +105,7 @@ public class ProcessorStateManagerTest {
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
-            stateMgr.register(persistentStore, true, batchingRestoreCallback);
+            stateMgr.register(persistentStore, batchingRestoreCallback);
             stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
             assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
@@ -123,7 +123,7 @@ public class ProcessorStateManagerTest {
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
-            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+            stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
             assertThat(persistentStore.keys.size(), is(1));
             assertTrue(persistentStore.keys.contains(intKey));
@@ -153,7 +153,7 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+            stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2)));
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -180,7 +180,7 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2)));
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -229,9 +229,9 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateMgr.register(store1, true, store1.stateRestoreCallback);
-            stateMgr.register(store2, true, store2.stateRestoreCallback);
-            stateMgr.register(store3, true, store3.stateRestoreCallback);
+            stateMgr.register(store1, store1.stateRestoreCallback);
+            stateMgr.register(store2, store2.stateRestoreCallback);
+            stateMgr.register(store3, store3.stateRestoreCallback);
 
             final Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
 
@@ -261,7 +261,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
         try {
-            stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
+            stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
 
             assertNull(stateMgr.getStore("noSuchStore"));
             assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
@@ -299,8 +299,8 @@ public class ProcessorStateManagerTest {
             // make sure the checkpoint file isn't deleted
             assertTrue(checkpointFile.exists());
 
-            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
-            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+            stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         } finally {
             // close the state manager with the ack'ed offsets
             stateMgr.flush();
@@ -330,7 +330,7 @@ public class ProcessorStateManagerTest {
             changelogReader,
             false,
                 logContext);
-        stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback);
+        stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
 
@@ -349,7 +349,7 @@ public class ProcessorStateManagerTest {
             changelogReader,
             false,
                 logContext);
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
         stateMgr.close(null);
         final Map<TopicPartition, Long> read = checkpoint.read();
         assertThat(read, equalTo(offsets));
@@ -366,7 +366,7 @@ public class ProcessorStateManagerTest {
             changelogReader,
             false,
                 logContext);
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
         final Map<TopicPartition, Long> read = checkpoint.read();
@@ -385,7 +385,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
         final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
         stateMgr.updateStandbyStates(persistentStorePartition,
                                      Collections.singletonList(
@@ -416,7 +416,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+        stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
 
         final Map<TopicPartition, Long> read = checkpoint.read();
@@ -435,7 +435,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L));
 
@@ -457,7 +457,7 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), true, null);
+            stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
             fail("should have thrown illegal argument exception when store name same as checkpoint file");
         } catch (final IllegalArgumentException e) {
             //pass
@@ -476,10 +476,10 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateManager.register(mockStateStore, false, null);
+        stateManager.register(mockStateStore, null);
 
         try {
-            stateManager.register(mockStateStore, false, null);
+            stateManager.register(mockStateStore, null);
             fail("should have thrown illegal argument exception when store with same name already registered");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -506,7 +506,7 @@ public class ProcessorStateManagerTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        stateManager.register(stateStore, false, stateStore.stateRestoreCallback);
+        stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
             stateManager.flush();
@@ -535,7 +535,7 @@ public class ProcessorStateManagerTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        stateManager.register(stateStore, false, stateStore.stateRestoreCallback);
+        stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
             stateManager.close(Collections.<TopicPartition, Long>emptyMap());
@@ -571,8 +571,8 @@ public class ProcessorStateManagerTest {
                 flushedStore.set(true);
             }
         };
-        stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback);
-        stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback);
+        stateManager.register(stateStore1, stateStore1.stateRestoreCallback);
+        stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
             stateManager.flush();
@@ -606,8 +606,8 @@ public class ProcessorStateManagerTest {
                 closedStore.set(true);
             }
         };
-        stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback);
-        stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback);
+        stateManager.register(stateStore1, stateStore1.stateRestoreCallback);
+        stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
             stateManager.close(Collections.<TopicPartition, Long>emptyMap());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
new file mode 100644
index 0000000..3a4efba
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class RecordDeserializerTest {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
+        1,
+        1,
+        10,
+        TimestampType.LOG_APPEND_TIME,
+        5,
+        3,
+        5,
+        new byte[0],
+        new byte[0]);
+
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() {
+        final RecordDeserializer recordDeserializer = new RecordDeserializer(
+            new TheSourceNode(false, false, "key", "value"), null, new LogContext());
+        final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(null, rawRecord);
+        assertEquals(rawRecord.topic(), record.topic());
+        assertEquals(rawRecord.partition(), record.partition());
+        assertEquals(rawRecord.offset(), record.offset());
+        assertEquals(rawRecord.checksum(), record.checksum());
+        assertEquals("key", record.key());
+        assertEquals("value", record.value());
+        assertEquals(rawRecord.timestamp(), record.timestamp());
+        assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+    }
+
+    static class TheSourceNode extends SourceNode {
+        private final boolean keyThrowsException;
+        private final boolean valueThrowsException;
+        private final Object key;
+        private final Object value;
+
+        TheSourceNode(final boolean keyThrowsException, final boolean valueThrowsException) {
+            this(keyThrowsException, valueThrowsException, null, null);
+        }
+
+        @SuppressWarnings("unchecked")
+        TheSourceNode(final boolean keyThrowsException,
+                      final boolean valueThrowsException,
+                      final Object key,
+                      final Object value) {
+            super("", Collections.EMPTY_LIST, null, null);
+            this.keyThrowsException = keyThrowsException;
+            this.valueThrowsException = valueThrowsException;
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public Object deserializeKey(final String topic, final Headers headers, final byte[] data) {
+            if (keyThrowsException) {
+                throw new RuntimeException();
+            }
+            return key;
+        }
+
+        @Override
+        public Object deserializeValue(final String topic, final Headers headers, final byte[] data) {
+            if (valueThrowsException) {
+                throw new RuntimeException();
+            }
+            return value;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index c33a9c4..c7af928 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
@@ -47,6 +44,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class RecordQueueTest {
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
@@ -56,12 +56,20 @@ public class RecordQueueTest {
     final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
             new RecordCollectorImpl(null, null,  new LogContext("record-queue-test ")));
     private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
-    private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
-            mockSourceNodeWithMetrics,
-            timestampExtractor, new LogAndFailExceptionHandler(), context);
-    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition(topics[0], 1),
-            mockSourceNodeWithMetrics,
-            timestampExtractor, new LogAndContinueExceptionHandler(), context);
+    private final RecordQueue queue = new RecordQueue(
+        new TopicPartition(topics[0], 1),
+        mockSourceNodeWithMetrics,
+        timestampExtractor,
+        new LogAndFailExceptionHandler(),
+        context,
+        new LogContext());
+    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
+        new TopicPartition(topics[0], 1),
+        mockSourceNodeWithMetrics,
+        timestampExtractor,
+        new LogAndContinueExceptionHandler(),
+        context,
+        new LogContext());
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
@@ -196,7 +204,8 @@ public class RecordQueueTest {
                                                   new MockSourceNode<>(topics, intDeserializer, intDeserializer),
                                                   new FailOnInvalidTimestamp(),
                                                   new LogAndContinueExceptionHandler(),
-                                  null);
+                                                  null,
+                                                  new LogContext());
         queue.addRawRecords(records);
     }
 
@@ -209,7 +218,8 @@ public class RecordQueueTest {
                                                   new MockSourceNode<>(topics, intDeserializer, intDeserializer),
                                                   new LogAndSkipOnInvalidTimestamp(),
                                                   new LogAndContinueExceptionHandler(),
-                                  null);
+                                                  null,
+                                                  new LogContext());
         queue.addRawRecords(records);
 
         assertEquals(0, queue.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
deleted file mode 100644
index 821dbe9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
-public class SourceNodeRecordDeserializerTest {
-
-    private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
-                                                                                  1,
-                                                                                  1,
-                                                                                  10,
-                                                                                  TimestampType.LOG_APPEND_TIME,
-                                                                                  5,
-                                                                                  3,
-                                                                                  5,
-                                                                                  new byte[0],
-                                                                                  new byte[0]);
-
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() {
-        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(true, false), null);
-        recordDeserializer.deserialize(rawRecord);
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() {
-        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(false, true), null);
-        recordDeserializer.deserialize(rawRecord);
-    }
-
-    @Test
-    public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() {
-        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(false, false, "key", "value"), null);
-        final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
-        assertEquals(rawRecord.topic(), record.topic());
-        assertEquals(rawRecord.partition(), record.partition());
-        assertEquals(rawRecord.offset(), record.offset());
-        assertEquals(rawRecord.checksum(), record.checksum());
-        assertEquals("key", record.key());
-        assertEquals("value", record.value());
-        assertEquals(rawRecord.timestamp(), record.timestamp());
-        assertEquals(TimestampType.CREATE_TIME, record.timestampType());
-    }
-
-    static class TheSourceNode extends SourceNode {
-        private final boolean keyThrowsException;
-        private final boolean valueThrowsException;
-        private final Object key;
-        private final Object value;
-
-        TheSourceNode(final boolean keyThrowsException, final boolean valueThrowsException) {
-            this(keyThrowsException, valueThrowsException, null, null);
-        }
-
-        @SuppressWarnings("unchecked")
-        TheSourceNode(final boolean keyThrowsException,
-                      final boolean valueThrowsException,
-                      final Object key,
-                      final Object value) {
-            super("", Collections.EMPTY_LIST, null, null);
-            this.keyThrowsException = keyThrowsException;
-            this.valueThrowsException = valueThrowsException;
-            this.key = key;
-            this.value = value;
-        }
-
-        @Override
-        public Object deserializeKey(final String topic, final Headers headers, final byte[] data) {
-            if (keyThrowsException) {
-                throw new RuntimeException();
-            }
-            return key;
-        }
-
-        @Override
-        public Object deserializeValue(final String topic, final Headers headers, final byte[] data) {
-            if (valueThrowsException) {
-                throw new RuntimeException();
-            }
-            return value;
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 7a7b119..86a1af1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -335,7 +335,7 @@ public class StandbyTaskTest {
         restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
                 new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
         final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
-        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count("my-store");
+        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count();
 
         final StreamsConfig config = createConfig(baseDir);
         final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index b9e6fee..47a0015 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -33,7 +33,7 @@ public class StateManagerStub implements StateManager {
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {}
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {}
 
     @Override
     public void flush() {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index c782790..62da23b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -40,7 +40,7 @@ public class StateRestorerTest {
 
     @Before
     public void setUp() {
-        compositeRestoreListener.setGlobalRestoreListener(reportingListener);
+        compositeRestoreListener.setUserRestoreListener(reportingListener);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 5a3fa69..5da0a64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -58,7 +58,7 @@ public class StoreChangelogReaderTest {
 
     @Before
     public void setUp() {
-        restoreListener.setGlobalRestoreListener(stateRestoreListener);
+        restoreListener.setUserRestoreListener(stateRestoreListener);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index d9a215c..cd37fab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -31,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.PartitionGrouper;
@@ -38,6 +40,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -781,7 +784,7 @@ public class StreamPartitionAssignorTest {
             // force repartitioning for aggregation
             .selectKey(new KeyValueMapper<Object, Object, Object>() {
                 @Override
-                public Object apply(Object key, Object value) {
+                public Object apply(final Object key, final Object value) {
                     return null;
                 }
             })
@@ -789,14 +792,14 @@ public class StreamPartitionAssignorTest {
 
             // Task 2 (should get created):
             // create repartioning and changelog topic as task 1 exists
-            .count("count")
+            .count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count"))
 
             // force repartitioning for join, but second join input topic unknown
             // -> internal repartitioning topic should not get created
             .toStream()
             .map(new KeyValueMapper<Object, Long, KeyValue<Object, Object>>() {
                 @Override
-                public KeyValue<Object, Object> apply(Object key, Long value) {
+                public KeyValue<Object, Object> apply(final Object key, final Long value) {
                     return null;
                 }
             });
@@ -809,7 +812,7 @@ public class StreamPartitionAssignorTest {
             // -> thus should not create internal repartitioning topic
             .selectKey(new KeyValueMapper<Object, Object, Object>() {
                 @Override
-                public Object apply(Object key, Object value) {
+                public Object apply(final Object key, final Object value) {
                     return null;
                 }
             })
@@ -820,7 +823,7 @@ public class StreamPartitionAssignorTest {
                 stream1,
                 new ValueJoiner() {
                     @Override
-                    public Object apply(Object value1, Object value2) {
+                    public Object apply(final Object value1, final Object value2) {
                         return null;
                     }
                 },
@@ -923,7 +926,7 @@ public class StreamPartitionAssignorTest {
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
         internalTopologyBuilder.setApplicationId(applicationId);
 
-        builder.stream("topic1").groupByKey().count("count");
+        builder.stream("topic1").groupByKey().count();
 
         final UUID uuid = UUID.randomUUID();
         mockThreadDataProvider(Collections.<TaskId>emptySet(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 91dd422..bbed615 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -573,7 +573,7 @@ public class StreamTaskTest {
         final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
             @Override
             public void init(final ProcessorContext context, final StateStore root) {
-                context.register(root, true, null);
+                context.register(root, false, null);
             }
 
             @Override
@@ -639,7 +639,7 @@ public class StreamTaskTest {
         final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
             @Override
             public void init(final ProcessorContext context, final StateStore root) {
-                context.register(root, true, null);
+                context.register(root, false, null);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index b4b67f7..6677084 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -51,7 +51,7 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index edbff13..37a8ab3 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -147,7 +147,9 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback func) {
+    public void register(final StateStore store,
+                         final boolean deprecatedAndIgnoredLoggingEnabled,
+                         final StateRestoreCallback func) {
         storeMap.put(store.name(), store);
         restoreFuncs.put(store.name(), func);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 5d6a4fa..a4b3118 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -27,26 +27,25 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 
+@SuppressWarnings("deprecation")
 public class MockStateStoreSupplier implements StateStoreSupplier {
     private String name;
     private boolean persistent;
     private boolean loggingEnabled;
-    private MockStateStore stateStore;
 
-    public MockStateStoreSupplier(String name, boolean persistent) {
+    public MockStateStoreSupplier(final String name,
+                                  final boolean persistent) {
         this(name, persistent, true);
     }
 
-    public MockStateStoreSupplier(String name, boolean persistent, boolean loggingEnabled) {
+    public MockStateStoreSupplier(final String name,
+                                  final boolean persistent,
+                                  final boolean loggingEnabled) {
         this.name = name;
         this.persistent = persistent;
         this.loggingEnabled = loggingEnabled;
     }
 
-    public MockStateStoreSupplier(final MockStateStore stateStore) {
-        this.stateStore = stateStore;
-    }
-
     @Override
     public String name() {
         return name;
@@ -54,14 +53,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
 
     @Override
     public StateStore get() {
-        if (stateStore != null) {
-            return stateStore;
-        }
-        if (loggingEnabled) {
-            return new MockStateStore(name, persistent).enableLogging();
-        } else {
-            return new MockStateStore(name, persistent);
-        }
+        return new MockStateStore(name, persistent);
     }
 
     @Override
@@ -78,30 +70,26 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         private final String name;
         private final boolean persistent;
 
-        public boolean loggingEnabled = false;
         public boolean initialized = false;
         public boolean flushed = false;
         public boolean closed = true;
         public final ArrayList<Integer> keys = new ArrayList<>();
 
-        public MockStateStore(String name, boolean persistent) {
+        public MockStateStore(final String name,
+                              final boolean persistent) {
             this.name = name;
             this.persistent = persistent;
         }
 
-        public MockStateStore enableLogging() {
-            loggingEnabled = true;
-            return this;
-        }
-
         @Override
         public String name() {
             return name;
         }
 
         @Override
-        public void init(ProcessorContext context, StateStore root) {
-            context.register(root, loggingEnabled, stateRestoreCallback);
+        public void init(final ProcessorContext context,
+                         final StateStore root) {
+            context.register(root, false, stateRestoreCallback);
             initialized = true;
             closed = false;
         }
@@ -130,7 +118,8 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
             private final Deserializer<Integer> deserializer = new IntegerDeserializer();
 
             @Override
-            public void restore(byte[] key, byte[] value) {
+            public void restore(final byte[] key,
+                                final byte[] value) {
                 keys.add(deserializer.deserialize("", key));
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 1b9cfed..bc56866 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -84,7 +84,9 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
+    public void register(final StateStore store,
+                         final boolean deprecatedAndIgnoredLoggingEnabled,
+                         final StateRestoreCallback stateRestoreCallback) {
         // no-op
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index babf704..1d91b52 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -227,8 +227,9 @@ public class ProcessorTopologyTestDriver {
                                                                                    stateRestoreListener);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
-                                                        stateManager, new LogAndContinueExceptionHandler()
-            );
+                                                        stateManager,
+                                                        new LogAndContinueExceptionHandler(),
+                                                        new LogContext());
             globalStateTask.initialize();
         }
 


[2/2] kafka git commit: KAFKA-5949; User Callback Exceptions need to be handled properly

Posted by da...@apache.org.
KAFKA-5949; User Callback Exceptions need to be handled properly

 - catch user exception in user callback (TimestampExtractor, DeserializationHandler, StateRestoreListener) and wrap with StreamsException

Additional cleanup:
 - rename globalRestoreListener to userRestoreListener
 - remove unnecessary interface -> collapse SourceNodeRecordDeserializer and RecordDeserializer
 - removed unused parameter loggingEnabled from ProcessorContext#register

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #3939 from mjsax/kafka-5949-exceptions-user-callbacks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5f2471c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5f2471c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5f2471c

Branch: refs/heads/trunk
Commit: e5f2471c548fc490a42dd0321bcf7fcdd4ddc52d
Parents: 2703fda
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Sep 28 11:00:31 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Sep 28 11:00:31 2017 +0100

----------------------------------------------------------------------
 .../errors/LogAndContinueExceptionHandler.java  |   6 +-
 .../errors/LogAndFailExceptionHandler.java      |   8 +-
 .../kafka/streams/kstream/ValueTransformer.java |   2 +
 .../internals/KStreamTransformValues.java       |   6 +-
 .../streams/processor/ProcessorContext.java     |  11 +-
 .../kafka/streams/processor/StateStore.java     |   4 +
 .../internals/AbstractProcessorContext.java     |   4 +-
 .../processor/internals/AbstractTask.java       |   6 +-
 .../processor/internals/AssignedTasks.java      |   5 +
 .../internals/CompositeRestoreListener.java     |  52 +++++++--
 .../processor/internals/GlobalStateManager.java |   6 +
 .../internals/GlobalStateManagerImpl.java       |   1 -
 .../internals/GlobalStateUpdateTask.java        |  21 ++--
 .../processor/internals/GlobalStreamThread.java |   7 +-
 .../internals/InternalTopologyBuilder.java      |   8 +-
 .../internals/ProcessorStateManager.java        |  13 +--
 .../processor/internals/RecordDeserializer.java |  70 +++++++++++-
 .../processor/internals/RecordQueue.java        |  28 +++--
 .../internals/SourceNodeRecordDeserializer.java |  90 ---------------
 .../processor/internals/StateManager.java       |   8 +-
 .../processor/internals/StateRestorer.java      |   4 +-
 .../internals/StoreChangelogReader.java         |   8 +-
 .../streams/processor/internals/StreamTask.java |   3 +-
 .../processor/internals/StreamThread.java       |  12 +-
 .../kafka/streams/processor/internals/Task.java |   3 +
 .../processor/internals/TaskManager.java        |   5 +-
 .../state/internals/InMemoryKeyValueStore.java  |   2 +-
 .../streams/state/internals/MemoryLRUCache.java |   2 +-
 .../streams/processor/TopologyBuilderTest.java  |   2 +-
 .../internals/CompositeRestoreListenerTest.java |   8 +-
 .../internals/GlobalStateManagerImplTest.java   |  42 +++----
 .../internals/GlobalStateTaskTest.java          |  20 +++-
 .../internals/InternalTopologyBuilderTest.java  |   3 +
 .../processor/internals/PartitionGroupTest.java |  18 ++-
 .../internals/ProcessorStateManagerTest.java    |  50 ++++-----
 .../internals/RecordDeserializerTest.java       |  98 ++++++++++++++++
 .../processor/internals/RecordQueueTest.java    |  32 ++++--
 .../SourceNodeRecordDeserializerTest.java       | 111 -------------------
 .../processor/internals/StandbyTaskTest.java    |   2 +-
 .../processor/internals/StateManagerStub.java   |   2 +-
 .../processor/internals/StateRestorerTest.java  |   2 +-
 .../internals/StoreChangelogReaderTest.java     |   2 +-
 .../internals/StreamPartitionAssignorTest.java  |  15 ++-
 .../processor/internals/StreamTaskTest.java     |   4 +-
 .../kafka/test/GlobalStateManagerStub.java      |   2 +-
 .../apache/kafka/test/MockProcessorContext.java |   4 +-
 .../kafka/test/MockStateStoreSupplier.java      |  39 +++----
 .../apache/kafka/test/NoOpProcessorContext.java |   4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   5 +-
 49 files changed, 478 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index dde4b52..b2ef45b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -38,9 +38,9 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
                                                  final Exception exception) {
 
         log.warn("Exception caught during Deserialization, " +
-                        "taskId: {}, topic: {}, partition: {}, offset: {}",
-                context.taskId(), record.topic(), record.partition(), record.offset(),
-                exception);
+                 "taskId: {}, topic: {}, partition: {}, offset: {}",
+                 context.taskId(), record.topic(), record.partition(), record.offset(),
+                 exception);
 
         return DeserializationHandlerResponse.CONTINUE;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 23557a3..60af32f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -37,10 +37,10 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
                                                  final ConsumerRecord<byte[], byte[]> record,
                                                  final Exception exception) {
 
-        log.warn("Exception caught during Deserialization, " +
-                        "taskId: {}, topic: {}, partition: {}, offset: {}",
-                context.taskId(), record.topic(), record.partition(), record.offset(),
-                exception);
+        log.error("Exception caught during Deserialization, " +
+                  "taskId: {}, topic: {}, partition: {}, offset: {}",
+                  context.taskId(), record.topic(), record.partition(), record.offset(),
+                  exception);
 
         return DeserializationHandlerResponse.FAIL;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 5463a76..0a8e890 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -62,6 +62,8 @@ public interface ValueTransformer<V, VR> {
      * {@code ValueTransformer} and will result in an {@link StreamsException exception}.
      *
      * @param context the context
+     * @throws IllegalStateException If store gets registered after initialization is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     void init(final ProcessorContext context);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index ab1c302..55c16cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -91,8 +91,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
                     }
 
                     @Override
-                    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
-                        context.register(store, loggingEnabled, stateRestoreCallback);
+                    public void register(final StateStore store,
+                                         final boolean deprecatedAndIgnoredLoggingEnabled,
+                                         final StateRestoreCallback stateRestoreCallback) {
+                        context.register(store, deprecatedAndIgnoredLoggingEnabled, stateRestoreCallback);
                     }
 
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index cdf1612..385d641 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.File;
 import java.util.Map;
@@ -75,8 +76,14 @@ public interface ProcessorContext {
      * Registers and possibly restores the specified storage engine.
      *
      * @param store the storage engine
-     */
-    void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
+     * @param loggingEnabledIsDeprecatedAndIgnored deprecated parameter {@code loggingEnabled} is ignored:
+     *                                             if you want to enable logging on a state stores call
+     *                                             {@link org.apache.kafka.streams.state.StoreBuilder#withLoggingEnabled(Map)}
+     *                                             when creating the store
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
 
     /**
      * Get the state store given the store name.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 3925951..cb8139c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.processor;
 
+import org.apache.kafka.streams.errors.StreamsException;
+
 /**
  * A storage engine for managing state maintained by a stream processor.
  *
@@ -36,6 +38,8 @@ public interface StateStore {
 
     /**
      * Initializes this state store
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     void init(ProcessorContext context, StateStore root);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 9e853fd..410212e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -93,13 +93,13 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
 
     @Override
     public void register(final StateStore store,
-                         final boolean loggingEnabled,
+                         final boolean deprecatedAndIgnoredLoggingEnabled,
                          final StateRestoreCallback stateRestoreCallback) {
         if (initialized) {
             throw new IllegalStateException("Can only create state stores during initialization.");
         }
         Objects.requireNonNull(store, "store must not be null");
-        stateManager.register(store, loggingEnabled, stateRestoreCallback);
+        stateManager.register(store, stateRestoreCallback);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 6734da6..c24686e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -90,7 +90,7 @@ public abstract class AbstractTask implements Task {
                 topology.storeToChangelogTopic(),
                 changelogReader,
                 eosEnabled,
-                    logContext);
+                logContext);
         } catch (final IOException e) {
             throw new ProcessorStateException(String.format("%sError while creating the state manager", logPrefix), e);
         }
@@ -196,6 +196,10 @@ public abstract class AbstractTask implements Task {
         stateMgr.flush();
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     void initializeStateStores() {
         if (topology.stateStores().isEmpty()) {
             return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index e51ebd7..fcb717d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
@@ -107,6 +108,10 @@ class AssignedTasks {
         return partitions;
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     void initializeNewTasks() {
         if (!created.isEmpty()) {
             log.debug("Initializing {}s {}", taskTypeName, created.keySet());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
index 138be77..a1c2f7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -32,7 +33,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
     public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener();
     private final BatchingStateRestoreCallback internalBatchingRestoreCallback;
     private final StateRestoreListener storeRestoreListener;
-    private StateRestoreListener globalRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
+    private StateRestoreListener userRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
 
     CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
 
@@ -45,31 +46,66 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
         internalBatchingRestoreCallback = getBatchingRestoreCallback(stateRestoreCallback);
     }
 
+    /**
+     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+     * {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)}
+     */
     @Override
     public void onRestoreStart(final TopicPartition topicPartition,
                                final String storeName,
                                final long startingOffset,
                                final long endingOffset) {
-        globalRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+        try {
+            userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+        } catch (final Exception fatalUserException) {
+            throw new StreamsException(
+                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                              storeName,
+                              topicPartition),
+                fatalUserException);
+        }
         storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
     }
 
+    /**
+     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+     * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}
+     */
     @Override
     public void onBatchRestored(final TopicPartition topicPartition,
                                 final String storeName,
                                 final long batchEndOffset,
                                 final long numRestored) {
-        globalRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+        try {
+            userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+        } catch (final Exception fatalUserException) {
+            throw new StreamsException(
+                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                    storeName,
+                    topicPartition),
+                fatalUserException);
+        }
         storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
     }
 
+    /**
+     * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+     * {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}
+     */
     @Override
     public void onRestoreEnd(final TopicPartition topicPartition,
                              final String storeName,
                              final long totalRestored) {
-        globalRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+        try {
+            userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+        } catch (final Exception fatalUserException) {
+            throw new StreamsException(
+                String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                    storeName,
+                    topicPartition),
+                fatalUserException);
+        }
         storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
-
     }
 
     @Override
@@ -77,9 +113,9 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
         internalBatchingRestoreCallback.restoreAll(records);
     }
 
-    void setGlobalRestoreListener(final StateRestoreListener globalRestoreListener) {
-        if (globalRestoreListener != null) {
-            this.globalRestoreListener = globalRestoreListener;
+    void setUserRestoreListener(final StateRestoreListener userRestoreListener) {
+        if (userRestoreListener != null) {
+            this.userRestoreListener = userRestoreListener;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
index b058844..c9b8ca8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
@@ -16,8 +16,14 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
+
 import java.util.Set;
 
 public interface GlobalStateManager extends StateManager {
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     Set<String> initialize(InternalProcessorContext processorContext);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index d03425b..10a0775 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -120,7 +120,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     }
 
     public void register(final StateStore store,
-                         final boolean ignored,
                          final StateRestoreCallback stateRestoreCallback) {
 
         if (stores.containsKey(store.name())) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 4c2b40f..849af57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -18,7 +18,9 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -33,29 +35,34 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     private final ProcessorTopology topology;
     private final InternalProcessorContext processorContext;
     private final Map<TopicPartition, Long> offsets = new HashMap<>();
-    private final Map<String, SourceNodeRecordDeserializer> deserializers = new HashMap<>();
+    private final Map<String, RecordDeserializer> deserializers = new HashMap<>();
     private final GlobalStateManager stateMgr;
     private final DeserializationExceptionHandler deserializationExceptionHandler;
-
+    private final LogContext logContext;
 
     public GlobalStateUpdateTask(final ProcessorTopology topology,
                                  final InternalProcessorContext processorContext,
                                  final GlobalStateManager stateMgr,
-                                 final DeserializationExceptionHandler deserializationExceptionHandler) {
-
+                                 final DeserializationExceptionHandler deserializationExceptionHandler,
+                                 final LogContext logContext) {
         this.topology = topology;
         this.stateMgr = stateMgr;
         this.processorContext = processorContext;
         this.deserializationExceptionHandler = deserializationExceptionHandler;
+        this.logContext = logContext;
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     public Map<TopicPartition, Long> initialize() {
         final Set<String> storeNames = stateMgr.initialize(processorContext);
         final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
             final SourceNode source = topology.source(sourceTopic);
-            deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler));
+            deserializers.put(sourceTopic, new RecordDeserializer(source, deserializationExceptionHandler, logContext));
         }
         initTopology();
         processorContext.initialized();
@@ -66,8 +73,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
     @SuppressWarnings("unchecked")
     @Override
     public void update(final ConsumerRecord<byte[], byte[]> record) {
-        final SourceNodeRecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
-        final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.tryDeserialize(processorContext, record);
+        final RecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
+        final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record);
 
         if (deserialized != null) {
             final ProcessorRecordContext recordContext =

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index a365add..1ee49e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -220,6 +220,10 @@ public class GlobalStreamThread extends Thread {
             this.flushInterval = flushInterval;
         }
 
+        /**
+         * @throws IllegalStateException If store gets registered after initialized is already finished
+         * @throws StreamsException if the store's change log does not contain the partition
+         */
         void initialize() {
             final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize();
             consumer.assign(partitionOffsets.keySet());
@@ -312,7 +316,8 @@ public class GlobalStreamThread extends Thread {
                                                                           streamsMetrics,
                                                                           cache),
                                                                   stateMgr,
-                                                                  config.defaultDeserializationExceptionHandler()),
+                                                                  config.defaultDeserializationExceptionHandler(),
+                                                                  logContext),
                                         time,
                                         config.getLong(StreamsConfig.POLL_MS_CONFIG),
                                         config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 81d2f6c..06405ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -178,8 +178,10 @@ public class InternalTopologyBuilder {
     }
 
     private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
+        @SuppressWarnings("deprecation")
         private final StateStoreSupplier supplier;
 
+        @SuppressWarnings("deprecation")
         StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
             super(supplier.name(),
                   supplier.loggingEnabled(),
@@ -495,6 +497,7 @@ public class InternalTopologyBuilder {
         nodeGrouper.unite(name, predecessorNames);
     }
 
+    @SuppressWarnings("deprecation")
     public final void addStateStore(final StateStoreSupplier supplier,
                                     final String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
@@ -527,6 +530,7 @@ public class InternalTopologyBuilder {
         }
     }
 
+    @SuppressWarnings("deprecation")
     public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
@@ -1612,8 +1616,8 @@ public class InternalTopologyBuilder {
             return Collections.unmodifiableSet(nodes);
         }
 
-        // only for testing
-        public Iterator<TopologyDescription.Node> nodesInOrder() {
+        // visible for testing
+        Iterator<TopologyDescription.Node> nodesInOrder() {
             return nodes.iterator();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2f16547..cc14c67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -41,8 +40,7 @@ import java.util.Map;
 
 public class ProcessorStateManager implements StateManager {
 
-
-    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
+    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
     private final Logger log;
@@ -119,17 +117,8 @@ public class ProcessorStateManager implements StateManager {
         return baseDir;
     }
 
-    /**
-     * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
-     * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
-     *
-     * // TODO: parameter loggingEnabled can be removed now
-     *
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
     @Override
     public void register(final StateStore store,
-                         final boolean loggingEnabled,
                          final StateRestoreCallback stateRestoreCallback) {
         log.debug("Registering state store {} to its state manager", store.name());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 4e04108..1b5f764 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -17,7 +17,73 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.slf4j.Logger;
 
-interface RecordDeserializer {
-    ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord);
+import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+
+class RecordDeserializer {
+    private final SourceNode sourceNode;
+    private final DeserializationExceptionHandler deserializationExceptionHandler;
+    private final Logger log;
+
+    RecordDeserializer(final SourceNode sourceNode,
+                       final DeserializationExceptionHandler deserializationExceptionHandler,
+                       final LogContext logContext) {
+        this.sourceNode = sourceNode;
+        this.deserializationExceptionHandler = deserializationExceptionHandler;
+        this.log = logContext.logger(RecordDeserializer.class);
+    }
+
+    /**
+     * @throws StreamsException if a deserialization error occurs and the deserialization callback returns
+     * {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL} or throws an exception itself
+     */
+    @SuppressWarnings("deprecation")
+    ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext,
+                                               final ConsumerRecord<byte[], byte[]> rawRecord) {
+
+        try {
+            return new ConsumerRecord<>(
+                rawRecord.topic(),
+                rawRecord.partition(),
+                rawRecord.offset(),
+                rawRecord.timestamp(),
+                TimestampType.CREATE_TIME,
+                rawRecord.checksum(),
+                rawRecord.serializedKeySize(),
+                rawRecord.serializedValueSize(),
+                sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
+                sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()));
+        } catch (final Exception deserializationException) {
+            final DeserializationExceptionHandler.DeserializationHandlerResponse response;
+            try {
+                response = deserializationExceptionHandler.handle(processorContext, rawRecord, deserializationException);
+            } catch (final Exception fatalUserException) {
+                log.error("Deserialization error callback failed after deserialization error for record {}",
+                          rawRecord,
+                          deserializationException);
+                throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
+            }
+
+            if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
+                throw new StreamsException("Deserialization exception handler is set to fail upon" +
+                    " a deserialization error. If you would rather have the streaming pipeline" +
+                    " continue after a deserialization error, please set the " +
+                    DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
+                    deserializationException);
+            } else {
+                sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
+            }
+        }
+        return null;
+    }
+
+    SourceNode sourceNode() {
+        return sourceNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 889b6d8..e6facaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -18,11 +18,12 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 
@@ -33,16 +34,14 @@ import java.util.ArrayDeque;
  * timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
  */
 public class RecordQueue {
-
-    private static final Logger log = LoggerFactory.getLogger(RecordQueue.class);
-
     private final SourceNode source;
     private final TimestampExtractor timestampExtractor;
     private final TopicPartition partition;
     private final ArrayDeque<StampedRecord> fifoQueue;
     private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
-    private final SourceNodeRecordDeserializer recordDeserializer;
+    private final RecordDeserializer recordDeserializer;
     private final ProcessorContext processorContext;
+    private final Logger log;
 
     private long partitionTime = TimestampTracker.NOT_KNOWN;
 
@@ -50,14 +49,16 @@ public class RecordQueue {
                 final SourceNode source,
                 final TimestampExtractor timestampExtractor,
                 final DeserializationExceptionHandler deserializationExceptionHandler,
-                final ProcessorContext processorContext) {
+                final ProcessorContext processorContext,
+                final LogContext logContext) {
         this.partition = partition;
         this.source = source;
         this.timestampExtractor = timestampExtractor;
         this.fifoQueue = new ArrayDeque<>();
         this.timeTracker = new MinTimestampTracker<>();
-        this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler);
+        this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext);
         this.processorContext = processorContext;
+        this.log = logContext.logger(RecordQueue.class);
     }
 
     /**
@@ -87,12 +88,21 @@ public class RecordQueue {
     int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
         for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
 
-            final ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
+            final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord);
             if (record == null) {
                 continue;
             }
 
-            final long timestamp = timestampExtractor.extract(record, timeTracker.get());
+            final long timestamp;
+            try {
+                timestamp = timestampExtractor.extract(record, timeTracker.get());
+            } catch (final StreamsException internalFatalExtractorException) {
+                throw internalFatalExtractorException;
+            } catch (final Exception fatalUserException) {
+                throw new StreamsException(
+                    String.format("Fatal user code error in TimestampExtractor callback for record %s.", record),
+                    fatalUserException);
+            }
             log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);
 
             // drop message if TS is invalid, i.e., negative

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
deleted file mode 100644
index 7fde881..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import static java.lang.String.format;
-import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
-
-class SourceNodeRecordDeserializer implements RecordDeserializer {
-    private final SourceNode sourceNode;
-    private final DeserializationExceptionHandler deserializationExceptionHandler;
-
-    SourceNodeRecordDeserializer(final SourceNode sourceNode,
-                                 final DeserializationExceptionHandler deserializationExceptionHandler) {
-        this.sourceNode = sourceNode;
-        this.deserializationExceptionHandler = deserializationExceptionHandler;
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
-        final Object key;
-        try {
-            key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key());
-        } catch (Exception e) {
-            throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d",
-                                              rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
-        }
-
-        final Object value;
-        try {
-            value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value());
-        } catch (Exception e) {
-            throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d",
-                                              rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
-        }
-
-        return new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
-                                    rawRecord.timestamp(), TimestampType.CREATE_TIME,
-                                    rawRecord.checksum(),
-                                    rawRecord.serializedKeySize(),
-                                    rawRecord.serializedValueSize(), key, value);
-
-    }
-
-    public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext,
-                                                         final ConsumerRecord<byte[], byte[]> rawRecord) {
-
-        // catch and process if we have a deserialization handler
-        try {
-            return deserialize(rawRecord);
-        } catch (final Exception e) {
-            final DeserializationExceptionHandler.DeserializationHandlerResponse response =
-                    deserializationExceptionHandler.handle(processorContext, rawRecord, e);
-            if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
-                throw new StreamsException("Deserialization exception handler is set to fail upon" +
-                        " a deserialization error. If you would rather have the streaming pipeline" +
-                        " continue after a deserialization error, please set the " +
-                        DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
-                        e);
-            } else {
-                sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
-            }
-        }
-        return null;
-    }
-
-    public SourceNode sourceNode() {
-        return sourceNode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 511280d..2a8d9a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 
@@ -27,7 +28,12 @@ import java.util.Map;
 interface StateManager extends Checkpointable {
     File baseDir();
 
-    void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback);
+    /**
+     * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
+     * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void register(final StateStore store, final StateRestoreCallback stateRestoreCallback);
 
     void flush();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 579561f..33dce9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -79,8 +79,8 @@ public class StateRestorer {
         return persistent;
     }
 
-    void setGlobalRestoreListener(StateRestoreListener globalStateRestoreListener) {
-        this.compositeRestoreListener.setGlobalRestoreListener(globalStateRestoreListener);
+    void setUserRestoreListener(StateRestoreListener userRestoreListener) {
+        this.compositeRestoreListener.setUserRestoreListener(userRestoreListener);
     }
 
     void setRestoredOffset(final long restoredOffset) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index caa0100..4ba860d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -41,7 +41,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
     private final Logger log;
     private final Consumer<byte[], byte[]> consumer;
-    private final StateRestoreListener stateRestoreListener;
+    private final StateRestoreListener userStateRestoreListener;
     private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
@@ -50,11 +50,11 @@ public class StoreChangelogReader implements ChangelogReader {
 
     public StoreChangelogReader(final String threadId,
                                 final Consumer<byte[], byte[]> consumer,
-                                final StateRestoreListener stateRestoreListener,
+                                final StateRestoreListener userStateRestoreListener,
                                 final LogContext logContext) {
         this.consumer = consumer;
         this.log = logContext.logger(getClass());
-        this.stateRestoreListener = stateRestoreListener;
+        this.userStateRestoreListener = userStateRestoreListener;
     }
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
@@ -65,7 +65,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        restorer.setGlobalRestoreListener(stateRestoreListener);
+        restorer.setUserRestoreListener(userStateRestoreListener);
         stateRestorers.put(restorer.partition(), restorer);
         needsInitializing.put(restorer.partition(), restorer);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6775edb..8c26fa9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -136,7 +136,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         for (final TopicPartition partition : partitions) {
             final SourceNode source = topology.source(partition.topic());
             final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
-            final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext);
+            final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext, logContext);
             partitionQueues.put(partition, queue);
         }
 
@@ -151,6 +151,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
+    @Override
     public boolean initialize() {
         log.trace("Initializing");
         initializeStateStores();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e141c46..8d13558 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -641,7 +641,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                       final StreamsMetadataState streamsMetadataState,
                                       final long cacheSizeBytes,
                                       final StateDirectory stateDirectory,
-                                      final StateRestoreListener stateRestoreListener) {
+                                      final StateRestoreListener userStateRestoreListener) {
 
         final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
         final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics,
@@ -666,8 +666,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
         final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId,
                                                                               restoreConsumer,
-                                                                              stateRestoreListener,
-                                                                                logContext);
+                                                                              userStateRestoreListener,
+                                                                              logContext);
 
         Producer<byte[], byte[]> threadProducer = null;
         if (!eosEnabled) {
@@ -757,6 +757,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
     /**
      * Main event loop for polling, and processing records through topologies.
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     private void runLoop() {
         long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
@@ -767,6 +769,10 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         }
     }
 
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     // Visible for testing
     long runOnce(final long recordsProcessedBeforeCommit) {
         long processedBeforeCommit = recordsProcessedBeforeCommit;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index a481c31..80e5423 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -71,6 +72,8 @@ public interface Task {
     /**
      * initialize the task and return true if the task is ready to run, i.e, it has not state stores
      * @return true if this task has no state stores that may need restoring.
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
      */
     boolean initialize();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f12ed91..278957e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -239,7 +239,10 @@ class TaskManager {
         this.consumer = consumer;
     }
 
-
+    /**
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
     boolean updateNewAndRestoringTasks() {
         active.initializeNewTasks();
         standby.initializeNewTasks();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 7e24969..929d584 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -72,7 +72,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
         if (root != null) {
             // register the store
-            context.register(root, true, new StateRestoreCallback() {
+            context.register(root, false, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
                     // this is a delete

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index beb9ce1..f1aa96f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -108,7 +108,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
             valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         // register the store
-        context.register(root, true, new StateRestoreCallback() {
+        context.register(root, false, new StateRestoreCallback() {
             @Override
             public void restore(byte[] key, byte[] value) {
                 restoring = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index ded2732..7786348 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -121,7 +121,7 @@ public class TopologyBuilderTest {
         final Serde<String> stringSerde = Serdes.String();
 
         try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), new String[]{});
+            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
             fail("Should throw TopologyBuilderException with no topics");
         } catch (TopologyBuilderException tpe) {
             //no-op

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index 88aba94..e2ea668 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -124,7 +124,7 @@ public class CompositeRestoreListenerTest {
     @Test
     public void shouldHandleNullReportStoreListener() {
         compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(null);
+        compositeRestoreListener.setUserRestoreListener(null);
 
         compositeRestoreListener.restoreAll(records);
         compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
@@ -138,7 +138,7 @@ public class CompositeRestoreListenerTest {
     @Test
     public void shouldHandleNoRestoreListener() {
         compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(null);
+        compositeRestoreListener.setUserRestoreListener(null);
 
         compositeRestoreListener.restoreAll(records);
         compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
@@ -151,7 +151,7 @@ public class CompositeRestoreListenerTest {
     @Test(expected = UnsupportedOperationException.class)
     public void shouldThrowExceptionWhenSinglePutDirectlyCalled() {
         compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(null);
+        compositeRestoreListener.setUserRestoreListener(null);
 
         compositeRestoreListener.restore(key, value);
     }
@@ -179,7 +179,7 @@ public class CompositeRestoreListenerTest {
 
     private void setUpCompositeRestoreListener(StateRestoreCallback stateRestoreCallback) {
         compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.setGlobalRestoreListener(reportingStoreListener);
+        compositeRestoreListener.setUserRestoreListener(reportingStoreListener);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index b438347..0519fb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -166,7 +166,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
 
         try {
-            stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), false, new TheStateRestoreCallback());
+            stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), new TheStateRestoreCallback());
             fail("should have raised an illegal argument exception as store is not in the topology");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -177,9 +177,9 @@ public class GlobalStateManagerImplTest {
     public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
         stateManager.initialize(context);
         initializeConsumer(2, 1, t1);
-        stateManager.register(store1, false, new TheStateRestoreCallback());
+        stateManager.register(store1, new TheStateRestoreCallback());
         try {
-            stateManager.register(store1, false, new TheStateRestoreCallback());
+            stateManager.register(store1, new TheStateRestoreCallback());
             fail("should have raised an illegal argument exception as store has already been registered");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -190,7 +190,7 @@ public class GlobalStateManagerImplTest {
     public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
         stateManager.initialize(context);
         try {
-            stateManager.register(store1, false, new TheStateRestoreCallback());
+            stateManager.register(store1, new TheStateRestoreCallback());
             fail("Should have raised a StreamsException as there are no partition for the store");
         } catch (final StreamsException e) {
             // pass
@@ -204,7 +204,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
 
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         assertEquals(2, stateRestoreCallback.restored.size());
     }
 
@@ -236,7 +236,7 @@ public class GlobalStateManagerImplTest {
 
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1,  stateRestoreCallback);
         assertEquals(5, stateRestoreCallback.restored.size());
     }
 
@@ -247,9 +247,9 @@ public class GlobalStateManagerImplTest {
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         // register the stores
         initializeConsumer(1, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         initializeConsumer(1, 1, t2);
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         stateManager.flush();
         assertTrue(store1.flushed);
@@ -267,7 +267,7 @@ public class GlobalStateManagerImplTest {
             public void flush() {
                 throw new RuntimeException("KABOOM!");
             }
-        }, false, stateRestoreCallback);
+        }, stateRestoreCallback);
 
         stateManager.flush();
     }
@@ -278,9 +278,9 @@ public class GlobalStateManagerImplTest {
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         // register the stores
         initializeConsumer(1, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         initializeConsumer(1, 1, t2);
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
         assertFalse(store1.isOpen());
@@ -292,7 +292,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(1, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
         stateManager.close(expected);
         final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
@@ -308,7 +308,7 @@ public class GlobalStateManagerImplTest {
             public void close() {
                 throw new RuntimeException("KABOOM!");
             }
-        }, false, stateRestoreCallback);
+        }, stateRestoreCallback);
 
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
     }
@@ -317,7 +317,7 @@ public class GlobalStateManagerImplTest {
     public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
         stateManager.initialize(context);
         try {
-            stateManager.register(store1, false, null);
+            stateManager.register(store1, null);
             fail("should have thrown due to null callback");
         } catch (IllegalArgumentException e) {
             //pass
@@ -349,7 +349,7 @@ public class GlobalStateManagerImplTest {
                 }
                 super.close();
             }
-        }, false, stateRestoreCallback);
+        }, stateRestoreCallback);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
 
 
@@ -368,9 +368,9 @@ public class GlobalStateManagerImplTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        stateManager.register(store, false, stateRestoreCallback);
+        stateManager.register(store, stateRestoreCallback);
 
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         try {
             stateManager.close(Collections.<TopicPartition, Long>emptyMap());
@@ -415,9 +415,9 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(10, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         initializeConsumer(20, 1, t2);
-        stateManager.register(store2, false, stateRestoreCallback);
+        stateManager.register(store2, stateRestoreCallback);
 
         final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed();
         stateManager.checkpoint(Collections.singletonMap(t1, 101L));
@@ -444,7 +444,7 @@ public class GlobalStateManagerImplTest {
 
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         final KeyValue<byte[], byte[]> restoredKv = stateRestoreCallback.restored.get(0);
         assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value))));
     }
@@ -454,7 +454,7 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(10, 1, t1);
-        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.register(store1, stateRestoreCallback);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
 
         final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 4ece443..63783a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
@@ -51,6 +52,7 @@ import static org.junit.Assert.fail;
 
 public class GlobalStateTaskTest {
 
+    private final LogContext logContext = new LogContext();
     private Map<TopicPartition, Long> offsets;
     private GlobalStateUpdateTask globalStateTask;
     private GlobalStateManagerStub stateMgr;
@@ -92,7 +94,7 @@ public class GlobalStateTaskTest {
         offsets.put(t1, 50L);
         offsets.put(t2, 100L);
         stateMgr = new GlobalStateManagerStub(storeNames, offsets);
-        globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler());
+        globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler(), logContext);
     }
 
     @Test
@@ -175,8 +177,12 @@ public class GlobalStateTaskTest {
 
     @Test
     public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception {
-        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
-            new LogAndContinueExceptionHandler());
+        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(
+            topology,
+            context,
+            stateMgr,
+            new LogAndContinueExceptionHandler(),
+            logContext);
         final byte[] key = new LongSerializer().serialize("t2", 1L);
         final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
 
@@ -185,8 +191,12 @@ public class GlobalStateTaskTest {
 
     @Test
     public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
-        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
-            new LogAndContinueExceptionHandler());
+        final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(
+            topology,
+            context,
+            stateMgr,
+            new LogAndContinueExceptionHandler(),
+            logContext);
         final byte[] key = new IntegerSerializer().serialize("t2", 1);
         final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 95636ec..e223699 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -295,6 +295,7 @@ public class InternalTopologyBuilderTest {
         } catch (final TopologyException expected) { /* ok */ }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testAddStateStore() {
         final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
@@ -344,6 +345,7 @@ public class InternalTopologyBuilderTest {
         assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testTopicGroupsByStateStore() {
         builder.setApplicationId("X");
@@ -470,6 +472,7 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId(null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() throws Exception {
         builder.addStateStore((StateStoreSupplier) null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index d9f38eb..00a2c35 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.test.MockSourceNode;
@@ -35,14 +36,27 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 
 public class PartitionGroupTest {
+    private final LogContext logContext = new LogContext();
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
     private final String[] topics = {"topic"};
     private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
     private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
-    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
-    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
+    private final RecordQueue queue1 = new RecordQueue(
+        partition1,
+        new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+        timestampExtractor,
+        new LogAndContinueExceptionHandler(),
+        null,
+        logContext);
+    private final RecordQueue queue2 = new RecordQueue(
+        partition2,
+        new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+        timestampExtractor,
+        new LogAndContinueExceptionHandler(),
+        null,
+        logContext);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);