You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/18 21:51:32 UTC

[kafka] branch trunk updated: MINOR: consolidate processor context for active/standby (#8669)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 392e49b  MINOR: consolidate processor context for active/standby (#8669)
392e49b is described below

commit 392e49b1eddd2fcd8d09177cc80abc8a51f5c991
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon May 18 14:50:54 2020 -0700

    MINOR: consolidate processor context for active/standby (#8669)
    
    This is a prerequisite for KAFKA-9501 and will also be useful for KAFKA-9603
    
    There should be no logical changes here: the main difference is the removal of StandbyContextImpl in preparation for contexts to transition between active and standby.
    
    Also includes some minor cleanup, eg pulling the ReadOnly/ReadWrite decorators out into a separate file.
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <vv...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../internals/AbstractProcessorContext.java        |  10 +-
 .../internals/AbstractReadOnlyDecorator.java       | 252 ++++++++++
 .../internals/AbstractReadWriteDecorator.java      | 248 ++++++++++
 .../internals/GlobalProcessorContextImpl.java      |  37 +-
 .../internals/GlobalStateManagerImpl.java          |   7 +-
 .../internals/InternalProcessorContext.java        |  19 +-
 .../processor/internals/ProcessorContextImpl.java  | 548 +++++----------------
 .../processor/internals/ProcessorStateManager.java |   4 +-
 .../processor/internals/StandbyContextImpl.java    | 190 -------
 .../streams/processor/internals/StandbyTask.java   |   2 +-
 .../streams/processor/internals/StateManager.java  |   4 +-
 .../state/internals/CachingKeyValueStore.java      |   2 +-
 .../state/internals/CachingSessionStore.java       |   2 +-
 .../state/internals/CachingWindowStore.java        |   2 +-
 .../internals/ChangeLoggingKeyValueBytesStore.java |  14 +-
 .../internals/ChangeLoggingSessionBytesStore.java  |  20 +-
 ...ChangeLoggingTimestampedKeyValueBytesStore.java |   4 +-
 .../ChangeLoggingTimestampedWindowBytesStore.java  |   4 +-
 .../internals/ChangeLoggingWindowBytesStore.java   |  17 +-
 .../streams/state/internals/StoreChangeLogger.java |  71 ---
 .../internals/AbstractProcessorContextTest.java    |  12 +-
 .../internals/GlobalProcessorContextImplTest.java  |   2 +
 .../internals/ProcessorContextImplTest.java        | 203 +++++++-
 .../processor/internals/ProcessorContextTest.java  |   9 +-
 .../processor/internals/StandbyTaskTest.java       |   5 +-
 .../processor/internals/StateManagerStub.java      |   6 +
 .../streams/state/KeyValueStoreTestDriver.java     |   2 +-
 .../ChangeLoggingSessionBytesStoreTest.java        |  29 +-
 ...angeLoggingTimestampedWindowBytesStoreTest.java |  37 +-
 .../ChangeLoggingWindowBytesStoreTest.java         |  40 +-
 .../state/internals/StoreChangeLoggerTest.java     |  84 ----
 .../apache/kafka/test/GlobalStateManagerStub.java  |   6 +
 .../kafka/test/InternalMockProcessorContext.java   |  23 +
 .../kafka/test/MockInternalProcessorContext.java   |  17 +-
 .../apache/kafka/test/NoOpProcessorContext.java    |  17 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +-
 .../streams/processor/MockProcessorContext.java    |   1 +
 37 files changed, 1020 insertions(+), 932 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index e684344..1132708 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -30,7 +31,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
     public static final String NONEXIST_TOPIC = "__null_topic__";
@@ -138,6 +138,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         if (recordContext == null) {
             throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
         }
+
         return recordContext.partition();
     }
 
@@ -205,7 +206,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
-    public ThreadCache getCache() {
+    public ThreadCache cache() {
         return cache;
     }
 
@@ -218,4 +219,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     public void uninitialize() {
         initialized = false;
     }
+
+    @Override
+    public TaskType taskType() {
+        return stateManager.taskType();
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
new file mode 100644
index 0000000..a63cd99
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.List;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
+
+    static final String ERROR_MESSAGE = "Global store is read only";
+
+    private AbstractReadOnlyDecorator(final T inner) {
+        super(inner);
+    }
+
+    @Override
+    public void flush() {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    static StateStore getReadOnlyStore(final StateStore global) {
+        if (global instanceof TimestampedKeyValueStore) {
+            return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore<?, ?>) global);
+        } else if (global instanceof KeyValueStore) {
+            return new KeyValueStoreReadOnlyDecorator<>((KeyValueStore<?, ?>) global);
+        } else if (global instanceof TimestampedWindowStore) {
+            return new TimestampedWindowStoreReadOnlyDecorator<>((TimestampedWindowStore<?, ?>) global);
+        } else if (global instanceof WindowStore) {
+            return new WindowStoreReadOnlyDecorator<>((WindowStore<?, ?>) global);
+        } else if (global instanceof SessionStore) {
+            return new SessionStoreReadOnlyDecorator<>((SessionStore<?, ?>) global);
+        } else {
+            return global;
+        }
+    }
+
+    static class KeyValueStoreReadOnlyDecorator<K, V>
+        extends AbstractReadOnlyDecorator<KeyValueStore<K, V>, K, V>
+        implements KeyValueStore<K, V> {
+
+        private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Override
+        public V get(final K key) {
+            return wrapped().get(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(final K from,
+                                            final K to) {
+            return wrapped().range(from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return wrapped().all();
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return wrapped().approximateNumEntries();
+        }
+
+        @Override
+        public void put(final K key,
+                        final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V putIfAbsent(final K key,
+                             final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void putAll(final List<KeyValue<K, V>> entries) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V delete(final K key) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+    }
+
+    static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
+        extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
+    static class WindowStoreReadOnlyDecorator<K, V>
+        extends AbstractReadOnlyDecorator<WindowStore<K, V>, K, V>
+        implements WindowStore<K, V> {
+
+        private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Deprecated
+        @Override
+        public void put(final K key,
+                        final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void put(final K key,
+                        final V value,
+                        final long windowStartTimestamp) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V fetch(final K key,
+                       final long time) {
+            return wrapped().fetch(key, time);
+        }
+
+        @Override
+        @Deprecated
+        public WindowStoreIterator<V> fetch(final K key,
+                                            final long timeFrom,
+                                            final long timeTo) {
+            return wrapped().fetch(key, timeFrom, timeTo);
+        }
+
+        @Override
+        @Deprecated
+        public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                      final K to,
+                                                      final long timeFrom,
+                                                      final long timeTo) {
+            return wrapped().fetch(from, to, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> all() {
+            return wrapped().all();
+        }
+
+        @Override
+        @Deprecated
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                         final long timeTo) {
+            return wrapped().fetchAll(timeFrom, timeTo);
+        }
+    }
+
+    static class TimestampedWindowStoreReadOnlyDecorator<K, V>
+        extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedWindowStore<K, V> {
+
+        private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
+    static class SessionStoreReadOnlyDecorator<K, AGG>
+        extends AbstractReadOnlyDecorator<SessionStore<K, AGG>, K, AGG>
+        implements SessionStore<K, AGG> {
+
+        private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
+            super(inner);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
+            return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+                                                               final K keyTo,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
+            return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public void remove(final Windowed<K> sessionKey) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void put(final Windowed<K> sessionKey,
+                        final AGG aggregate) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public AGG fetchSession(final K key, final long startTime, final long endTime) {
+            return wrapped().fetchSession(key, startTime, endTime);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+            return wrapped().fetch(key);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+                                                        final K to) {
+            return wrapped().fetch(from, to);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
new file mode 100644
index 0000000..494d98e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.List;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
+    static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
+
+    private AbstractReadWriteDecorator(final T inner) {
+        super(inner);
+    }
+
+    @Override
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    @Override
+    public void close() {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);
+    }
+
+    static StateStore getReadWriteStore(final StateStore store) {
+        if (store instanceof TimestampedKeyValueStore) {
+            return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
+        } else if (store instanceof KeyValueStore) {
+            return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store);
+        } else if (store instanceof TimestampedWindowStore) {
+            return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store);
+        } else if (store instanceof WindowStore) {
+            return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store);
+        } else if (store instanceof SessionStore) {
+            return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store);
+        } else {
+            return store;
+        }
+    }
+
+    static class KeyValueStoreReadWriteDecorator<K, V>
+        extends AbstractReadWriteDecorator<KeyValueStore<K, V>, K, V>
+        implements KeyValueStore<K, V> {
+
+        KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Override
+        public V get(final K key) {
+            return wrapped().get(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(final K from,
+                                            final K to) {
+            return wrapped().range(from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return wrapped().all();
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return wrapped().approximateNumEntries();
+        }
+
+        @Override
+        public void put(final K key,
+                        final V value) {
+            wrapped().put(key, value);
+        }
+
+        @Override
+        public V putIfAbsent(final K key,
+                             final V value) {
+            return wrapped().putIfAbsent(key, value);
+        }
+
+        @Override
+        public void putAll(final List<KeyValue<K, V>> entries) {
+            wrapped().putAll(entries);
+        }
+
+        @Override
+        public V delete(final K key) {
+            return wrapped().delete(key);
+        }
+    }
+
+    static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
+        extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
+    static class WindowStoreReadWriteDecorator<K, V>
+        extends AbstractReadWriteDecorator<WindowStore<K, V>, K, V>
+        implements WindowStore<K, V> {
+
+        WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Deprecated
+        @Override
+        public void put(final K key,
+                        final V value) {
+            wrapped().put(key, value);
+        }
+
+        @Override
+        public void put(final K key,
+                        final V value,
+                        final long windowStartTimestamp) {
+            wrapped().put(key, value, windowStartTimestamp);
+        }
+
+        @Override
+        public V fetch(final K key,
+                       final long time) {
+            return wrapped().fetch(key, time);
+        }
+
+        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+        @Override
+        public WindowStoreIterator<V> fetch(final K key,
+                                            final long timeFrom,
+                                            final long timeTo) {
+            return wrapped().fetch(key, timeFrom, timeTo);
+        }
+
+        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                      final K to,
+                                                      final long timeFrom,
+                                                      final long timeTo) {
+            return wrapped().fetch(from, to, timeFrom, timeTo);
+        }
+
+        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                         final long timeTo) {
+            return wrapped().fetchAll(timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> all() {
+            return wrapped().all();
+        }
+    }
+
+    static class TimestampedWindowStoreReadWriteDecorator<K, V>
+        extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedWindowStore<K, V> {
+
+        TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
+    static class SessionStoreReadWriteDecorator<K, AGG>
+        extends AbstractReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
+        implements SessionStore<K, AGG> {
+
+        SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+            super(inner);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
+            return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+                                                               final K keyTo,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
+            return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public void remove(final Windowed<K> sessionKey) {
+            wrapped().remove(sessionKey);
+        }
+
+        @Override
+        public void put(final Windowed<K> sessionKey,
+                        final AGG aggregate) {
+            wrapped().put(sessionKey, aggregate);
+        }
+
+        @Override
+        public AGG fetchSession(final K key,
+                                final long startTime,
+                                final long endTime) {
+            return wrapped().fetchSession(key, startTime, endTime);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+            return wrapped().fetch(key);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+                                                        final K to) {
+            return wrapped().fetch(from, to);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 859430c..81169d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
+
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -23,24 +26,13 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedKeyValueStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedWindowStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
 
 public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
-
     public GlobalProcessorContextImpl(final StreamsConfig config,
                                       final StateManager stateMgr,
                                       final StreamsMetricsImpl metrics,
@@ -51,20 +43,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     @Override
     public StateStore getStateStore(final String name) {
         final StateStore store = stateManager.getGlobalStore(name);
-
-        if (store instanceof TimestampedKeyValueStore) {
-            return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
-        } else if (store instanceof KeyValueStore) {
-            return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store);
-        } else if (store instanceof TimestampedWindowStore) {
-            return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store);
-        } else if (store instanceof WindowStore) {
-            return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store);
-        } else if (store instanceof SessionStore) {
-            return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store);
-        }
-
-        return store;
+        return getReadWriteStore(store);
     }
 
     @SuppressWarnings("unchecked")
@@ -130,4 +109,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
     }
+
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+        throw new UnsupportedOperationException("this should not happen: logChange() not supported in global processor context.");
+    }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f17131d..1def55c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.slf4j.Logger;
@@ -222,7 +223,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         } finally {
             globalConsumer.unsubscribe();
         }
-
     }
 
     private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
@@ -339,7 +339,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         }
     }
 
-
     @Override
     public void close() throws IOException {
         try {
@@ -394,6 +393,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         }
     }
 
+    @Override
+    public TaskType taskType() {
+        return TaskType.GLOBAL;
+    }
 
     @Override
     public Map<TopicPartition, Long> changelogOffsets() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 145e889..db5cfc9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -16,9 +16,13 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -29,6 +33,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
  * {@link ThreadCache}
  */
 public interface InternalProcessorContext extends ProcessorContext {
+    BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
+    ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
 
     @Override
     StreamsMetricsImpl metrics();
@@ -67,7 +73,7 @@ public interface InternalProcessorContext extends ProcessorContext {
     /**
      * Get the thread-global cache
      */
-    ThreadCache getCache();
+    ThreadCache cache();
 
     /**
      * Mark this context as being initialized
@@ -80,10 +86,21 @@ public interface InternalProcessorContext extends ProcessorContext {
     void uninitialize();
 
     /**
+     * @return the type of task (active/standby/global) that this context corresponds to
+     */
+    TaskType taskType();
+
+    /**
      * Get a correctly typed state store, given a handle on the original builder.
      */
     @SuppressWarnings("unchecked")
     default <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
         return (T) getStateStore(builder.name());
     }
+
+    void logChange(final String storeName,
+                   final Bytes key,
+                   final byte[] value,
+                   final long timestamp);
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index d390af5..c776367 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,87 +16,124 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.streams.KeyValue;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import java.time.Duration;
 import java.util.List;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
-
-    private final StreamTask task;
+    // The below are both null for standby tasks
+    private final StreamTask streamTask;
     private final RecordCollector collector;
+
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
+    final Map<String, String> storeToChangelogTopic = new HashMap<>();
+
     ProcessorContextImpl(final TaskId id,
-                         final StreamTask task,
+                         final StreamTask streamTask,
                          final StreamsConfig config,
                          final RecordCollector collector,
                          final ProcessorStateManager stateMgr,
                          final StreamsMetricsImpl metrics,
                          final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.task = task;
+        this.streamTask = streamTask;
         this.collector = collector;
+
+        if (streamTask == null && taskType() == TaskType.ACTIVE) {
+            throw new IllegalStateException("Tried to create context for active task but the streamtask was null");
+        }
+    }
+
+    ProcessorContextImpl(final TaskId id,
+                         final StreamsConfig config,
+                         final ProcessorStateManager stateMgr,
+                         final StreamsMetricsImpl metrics) {
+        this(
+            id,
+            null,
+            config,
+            null,
+            stateMgr,
+            metrics,
+            new ThreadCache(
+                new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
+                0,
+                metrics
+            )
+        );
     }
 
-    public ProcessorStateManager getStateMgr() {
+    public ProcessorStateManager stateManager() {
         return (ProcessorStateManager) stateManager;
     }
 
     @Override
+    public void register(final StateStore store,
+                         final StateRestoreCallback stateRestoreCallback) {
+        storeToChangelogTopic.put(store.name(), ProcessorStateManager.storeChangelogTopic(applicationId(), store.name()));
+        super.register(store, stateRestoreCallback);
+    }
+
+    @Override
     public RecordCollector recordCollector() {
         return collector;
     }
 
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+        throwUnsupportedOperationExceptionIfStandby("logChange");
+        // Sending null headers to changelog topics (KIP-244)
+        collector.send(
+            storeToChangelogTopic.get(storeName),
+            key,
+            value,
+            null,
+            taskId().partition,
+            timestamp,
+            BYTES_KEY_SERIALIZER,
+            BYTEARRAY_VALUE_SERIALIZER);
+    }
+
     /**
      * @throws StreamsException if an attempt is made to access this state store from an unknown node
+     * @throws UnsupportedOperationException if the current streamTask type is standby
      */
     @Override
     public StateStore getStateStore(final String name) {
+        throwUnsupportedOperationExceptionIfStandby("getStateStore");
         if (currentNode() == null) {
             throw new StreamsException("Accessing from an unknown node");
         }
 
-        final StateStore global = stateManager.getGlobalStore(name);
-        if (global != null) {
-            if (global instanceof TimestampedKeyValueStore) {
-                return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore<?, ?>) global);
-            } else if (global instanceof KeyValueStore) {
-                return new KeyValueStoreReadOnlyDecorator<>((KeyValueStore<?, ?>) global);
-            } else if (global instanceof TimestampedWindowStore) {
-                return new TimestampedWindowStoreReadOnlyDecorator<>((TimestampedWindowStore<?, ?>) global);
-            } else if (global instanceof WindowStore) {
-                return new WindowStoreReadOnlyDecorator<>((WindowStore<?, ?>) global);
-            } else if (global instanceof SessionStore) {
-                return new SessionStoreReadOnlyDecorator<>((SessionStore<?, ?>) global);
-            }
-
-            return global;
+        final StateStore globalStore = stateManager.getGlobalStore(name);
+        if (globalStore != null) {
+            return getReadOnlyStore(globalStore);
         }
 
         if (!currentNode().stateStores.contains(name)) {
@@ -110,24 +147,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         final StateStore store = stateManager.getStore(name);
-        if (store instanceof TimestampedKeyValueStore) {
-            return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
-        } else if (store instanceof KeyValueStore) {
-            return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store);
-        } else if (store instanceof TimestampedWindowStore) {
-            return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store);
-        } else if (store instanceof WindowStore) {
-            return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store);
-        } else if (store instanceof SessionStore) {
-            return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store);
-        }
-
-        return store;
+        return getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
+        throwUnsupportedOperationExceptionIfStandby("forward");
         forward(key, value, SEND_TO_ALL);
     }
 
@@ -136,6 +162,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
+        throwUnsupportedOperationExceptionIfStandby("forward");
         forward(
             key,
             value,
@@ -147,6 +174,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
+        throwUnsupportedOperationExceptionIfStandby("forward");
         forward(key, value, To.child(childName));
     }
 
@@ -155,6 +183,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        throwUnsupportedOperationExceptionIfStandby("forward");
         final ProcessorNode<?, ?> previousNode = currentNode();
         final ProcessorRecordContext previousContext = recordContext;
 
@@ -198,7 +227,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     @Override
     public void commit() {
-        task.requestCommit();
+        throwUnsupportedOperationExceptionIfStandby("commit");
+        streamTask.requestCommit();
     }
 
     @Override
@@ -206,10 +236,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public Cancellable schedule(final long intervalMs,
                                 final PunctuationType type,
                                 final Punctuator callback) {
+        throwUnsupportedOperationExceptionIfStandby("schedule");
         if (intervalMs < 1) {
             throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
         }
-        return task.schedule(intervalMs, type, callback);
+        return streamTask.schedule(intervalMs, type, callback);
     }
 
     @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
@@ -217,414 +248,57 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws IllegalArgumentException {
+        throwUnsupportedOperationExceptionIfStandby("schedule");
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
         return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
     }
 
-    private abstract static class StateStoreReadOnlyDecorator<T extends StateStore, K, V>
-        extends WrappedStateStore<T, K, V> {
-
-        static final String ERROR_MESSAGE = "Global store is read only";
-
-        private StateStoreReadOnlyDecorator(final T inner) {
-            super(inner);
-        }
-
-        @Override
-        public void flush() {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public void init(final ProcessorContext context,
-                         final StateStore root) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public void close() {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-    }
-
-    private static class KeyValueStoreReadOnlyDecorator<K, V>
-        extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>, K, V>
-        implements KeyValueStore<K, V> {
-
-        private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
-            super(inner);
-        }
-
-        @Override
-        public V get(final K key) {
-            return wrapped().get(key);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(final K from,
-                                            final K to) {
-            return wrapped().range(from, to);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return wrapped().all();
-        }
-
-        @Override
-        public long approximateNumEntries() {
-            return wrapped().approximateNumEntries();
-        }
-
-        @Override
-        public void put(final K key,
-                        final V value) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public V putIfAbsent(final K key,
-                             final V value) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public void putAll(final List<KeyValue<K, V>> entries) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public V delete(final K key) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-    }
-
-    private static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
-        extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
-        implements TimestampedKeyValueStore<K, V> {
-
-        private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) {
-            super(inner);
-        }
-    }
-
-    private static class WindowStoreReadOnlyDecorator<K, V>
-        extends StateStoreReadOnlyDecorator<WindowStore<K, V>, K, V>
-        implements WindowStore<K, V> {
-
-        private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
-            super(inner);
-        }
-
-        @Deprecated
-        @Override
-        public void put(final K key,
-                        final V value) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public void put(final K key,
-                        final V value,
-                        final long windowStartTimestamp) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public V fetch(final K key,
-                       final long time) {
-            return wrapped().fetch(key, time);
-        }
-
-        @Override
-        @Deprecated
-        public WindowStoreIterator<V> fetch(final K key,
-                                            final long timeFrom,
-                                            final long timeTo) {
-            return wrapped().fetch(key, timeFrom, timeTo);
-        }
-
-        @Override
-        @Deprecated
-        public KeyValueIterator<Windowed<K>, V> fetch(final K from,
-                                                      final K to,
-                                                      final long timeFrom,
-                                                      final long timeTo) {
-            return wrapped().fetch(from, to, timeFrom, timeTo);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, V> all() {
-            return wrapped().all();
-        }
-
-        @Override
-        @Deprecated
-        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
-                                                         final long timeTo) {
-            return wrapped().fetchAll(timeFrom, timeTo);
-        }
-    }
-
-    private static class TimestampedWindowStoreReadOnlyDecorator<K, V>
-        extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
-        implements TimestampedWindowStore<K, V> {
-
-        private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) {
-            super(inner);
-        }
+    @Override
+    public String topic() {
+        throwUnsupportedOperationExceptionIfStandby("topic");
+        return super.topic();
     }
 
-    private static class SessionStoreReadOnlyDecorator<K, AGG>
-        extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>, K, AGG>
-        implements SessionStore<K, AGG> {
-
-        private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
-            super(inner);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
-                                                               final long earliestSessionEndTime,
-                                                               final long latestSessionStartTime) {
-            return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
-                                                               final K keyTo,
-                                                               final long earliestSessionEndTime,
-                                                               final long latestSessionStartTime) {
-            return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
-        }
-
-        @Override
-        public void remove(final Windowed<K> sessionKey) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public void put(final Windowed<K> sessionKey,
-                        final AGG aggregate) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public AGG fetchSession(final K key, final long startTime, final long endTime) {
-            return wrapped().fetchSession(key, startTime, endTime);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
-            return wrapped().fetch(key);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
-                                                        final K to) {
-            return wrapped().fetch(from, to);
-        }
+    @Override
+    public int partition() {
+        throwUnsupportedOperationExceptionIfStandby("partition");
+        return super.partition();
     }
 
-    private abstract static class StateStoreReadWriteDecorator<T extends StateStore, K, V>
-        extends WrappedStateStore<T, K, V> {
-
-        static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
-
-        private StateStoreReadWriteDecorator(final T inner) {
-            super(inner);
-        }
-
-        @Override
-        public void init(final ProcessorContext context,
-                         final StateStore root) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
-        @Override
-        public void close() {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
+    @Override
+    public long offset() {
+        throwUnsupportedOperationExceptionIfStandby("offset");
+        return super.offset();
     }
 
-    static class KeyValueStoreReadWriteDecorator<K, V>
-        extends StateStoreReadWriteDecorator<KeyValueStore<K, V>, K, V>
-        implements KeyValueStore<K, V> {
-
-        KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
-            super(inner);
-        }
-
-        @Override
-        public V get(final K key) {
-            return wrapped().get(key);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(final K from,
-                                            final K to) {
-            return wrapped().range(from, to);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return wrapped().all();
-        }
-
-        @Override
-        public long approximateNumEntries() {
-            return wrapped().approximateNumEntries();
-        }
-
-        @Override
-        public void put(final K key,
-                        final V value) {
-            wrapped().put(key, value);
-        }
-
-        @Override
-        public V putIfAbsent(final K key,
-                             final V value) {
-            return wrapped().putIfAbsent(key, value);
-        }
-
-        @Override
-        public void putAll(final List<KeyValue<K, V>> entries) {
-            wrapped().putAll(entries);
-        }
-
-        @Override
-        public V delete(final K key) {
-            return wrapped().delete(key);
-        }
+    @Override
+    public long timestamp() {
+        throwUnsupportedOperationExceptionIfStandby("timestamp");
+        return super.timestamp();
     }
 
-    static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
-        extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
-        implements TimestampedKeyValueStore<K, V> {
-
-        TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
-            super(inner);
-        }
+    @Override
+    public ProcessorNode<?, ?> currentNode() {
+        throwUnsupportedOperationExceptionIfStandby("currentNode");
+        return super.currentNode();
     }
 
-    static class WindowStoreReadWriteDecorator<K, V>
-        extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V>
-        implements WindowStore<K, V> {
-
-        WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
-            super(inner);
-        }
-
-        @Deprecated
-        @Override
-        public void put(final K key,
-                        final V value) {
-            wrapped().put(key, value);
-        }
-
-        @Override
-        public void put(final K key,
-                        final V value,
-                        final long windowStartTimestamp) {
-            wrapped().put(key, value, windowStartTimestamp);
-        }
-
-        @Override
-        public V fetch(final K key,
-                       final long time) {
-            return wrapped().fetch(key, time);
-        }
-
-        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
-        @Override
-        public WindowStoreIterator<V> fetch(final K key,
-                                            final long timeFrom,
-                                            final long timeTo) {
-            return wrapped().fetch(key, timeFrom, timeTo);
-        }
-
-        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
-        @Override
-        public KeyValueIterator<Windowed<K>, V> fetch(final K from,
-                                                      final K to,
-                                                      final long timeFrom,
-                                                      final long timeTo) {
-            return wrapped().fetch(from, to, timeFrom, timeTo);
-        }
-
-        @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
-        @Override
-        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
-                                                         final long timeTo) {
-            return wrapped().fetchAll(timeFrom, timeTo);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, V> all() {
-            return wrapped().all();
-        }
+    @Override
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
+        throwUnsupportedOperationExceptionIfStandby("setRecordContext");
+        super.setRecordContext(recordContext);
     }
 
-    static class TimestampedWindowStoreReadWriteDecorator<K, V>
-        extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
-        implements TimestampedWindowStore<K, V> {
-
-        TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) {
-            super(inner);
-        }
+    @Override
+    public ProcessorRecordContext recordContext() {
+        throwUnsupportedOperationExceptionIfStandby("recordContext");
+        return super.recordContext();
     }
 
-    static class SessionStoreReadWriteDecorator<K, AGG>
-        extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
-        implements SessionStore<K, AGG> {
-
-        SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
-            super(inner);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
-                                                               final long earliestSessionEndTime,
-                                                               final long latestSessionStartTime) {
-            return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
-                                                               final K keyTo,
-                                                               final long earliestSessionEndTime,
-                                                               final long latestSessionStartTime) {
-            return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
-        }
-
-        @Override
-        public void remove(final Windowed<K> sessionKey) {
-            wrapped().remove(sessionKey);
-        }
-
-        @Override
-        public void put(final Windowed<K> sessionKey,
-                        final AGG aggregate) {
-            wrapped().put(sessionKey, aggregate);
-        }
-
-        @Override
-        public AGG fetchSession(final K key,
-                                final long startTime,
-                                final long endTime) {
-            return wrapped().fetchSession(key, startTime, endTime);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
-            return wrapped().fetch(key);
-        }
-
-        @Override
-        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
-                                                        final K to) {
-            return wrapped().fetch(from, to);
+    private void throwUnsupportedOperationExceptionIfStandby(final String operationName) {
+        if (taskType() == TaskType.STANDBY) {
+            throw new UnsupportedOperationException(
+                "this should not happen: " + operationName + "() is not supported in standby tasks.");
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index a3ab881..f7c1936 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -345,8 +345,8 @@ public class ProcessorStateManager implements StateManager {
         return sourcePartitions.contains(partition);
     }
 
-    // used by the changelog reader only
-    TaskType taskType() {
+    @Override
+    public TaskType taskType() {
         return taskType;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
deleted file mode 100644
index 9a94ad6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.time.Duration;
-
-class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
-
-    StandbyContextImpl(final TaskId id,
-                       final StreamsConfig config,
-                       final ProcessorStateManager stateMgr,
-                       final StreamsMetricsImpl metrics) {
-        super(
-            id,
-            config,
-            metrics,
-            stateMgr,
-            new ThreadCache(
-                new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
-                0,
-                metrics
-            )
-        );
-    }
-
-
-    StateManager getStateMgr() {
-        return stateManager;
-    }
-
-    @Override
-    public RecordCollector recordCollector() {
-        // return null collector specifically since in standby task it should not be called;
-        // if ever then we would throw NPE, which should never happen
-        return null;
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public StateStore getStateStore(final String name) {
-        throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public String topic() {
-        throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public int partition() {
-        throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public long offset() {
-        throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public long timestamp() {
-        throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public <K, V> void forward(final K key, final V value) {
-        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public <K, V> void forward(final K key, final V value, final To to) {
-        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    @Deprecated
-    public <K, V> void forward(final K key, final V value, final int childIndex) {
-        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    @Deprecated
-    public <K, V> void forward(final K key, final V value, final String childName) {
-        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public void commit() {
-        throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    @Deprecated
-    public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
-        throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
-        throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public ProcessorRecordContext recordContext() {
-        throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks.");
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode currentNode) {
-        // no-op. can't throw as this is called on commit when the StateStores get flushed.
-    }
-
-    /**
-     * @throws UnsupportedOperationException on every invocation
-     */
-    @Override
-    public ProcessorNode currentNode() {
-        throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 83e0d73..b4abd79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -70,7 +70,7 @@ public class StandbyTask extends AbstractTask implements Task {
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
-        processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+        processorContext = new ProcessorContextImpl(id, config, stateMgr, metrics);
         closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
         eosEnabled = StreamThread.eosEnabled(config);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 0cb49754..674ea18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
-
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 
 interface StateManager {
     File baseDir();
@@ -46,6 +46,8 @@ interface StateManager {
 
     void close() throws IOException;
 
+    TaskType taskType();
+
     // TODO: we can remove this when consolidating global state manager into processor state manager
     StateStore getGlobalStore(final String name);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 14f4e54..8026b04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -69,7 +69,7 @@ public class CachingKeyValueStore
     private void initInternal(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
 
-        this.cache = this.context.getCache();
+        this.cache = this.context.cache();
         this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
         cache.addDirtyEntryFlushListener(cacheName, entries -> {
             for (final ThreadCache.DirtyEntry entry : entries) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index f537d4c..4976ef1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -72,7 +72,7 @@ class CachingSessionStore
         this.context = context;
 
         cacheName = context.taskId() + "-" + name();
-        cache = context.getCache();
+        cache = context.cache();
         cache.addDirtyEntryFlushListener(cacheName, entries -> {
             for (final ThreadCache.DirtyEntry entry : entries) {
                 putAndMaybeForward(entry, context);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index d2bd02e..e71f87e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -84,7 +84,7 @@ class CachingWindowStore
             Serdes.Bytes(),
             Serdes.ByteArray());
         name = context.taskId() + "-" + name();
-        cache = this.context.getCache();
+        cache = this.context.cache();
 
         cache.addDirtyEntryFlushListener(name, entries -> {
             for (final ThreadCache.DirtyEntry entry : entries) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index a924af6..35f6d36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -16,15 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.List;
 
@@ -32,7 +30,7 @@ public class ChangeLoggingKeyValueBytesStore
     extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
     implements KeyValueStore<Bytes, byte[]> {
 
-    StoreChangeLogger<Bytes, byte[]> changeLogger;
+    InternalProcessorContext context;
 
     ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) {
         super(inner);
@@ -42,11 +40,7 @@ public class ChangeLoggingKeyValueBytesStore
     public void init(final ProcessorContext context,
                      final StateStore root) {
         super.init(context, root);
-        final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
-        changeLogger = new StoreChangeLogger<>(
-            name(),
-            context,
-            new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+        this.context = (InternalProcessorContext) context;
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
         if (wrapped() instanceof MemoryLRUCache) {
@@ -113,6 +107,6 @@ public class ChangeLoggingKeyValueBytesStore
 
     void log(final Bytes key,
              final byte[] value) {
-        changeLogger.logChange(key, value);
+        context.logChange(name(), key, value, context.timestamp());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 361f8a5..cc586d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -16,15 +16,13 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * Simple wrapper around a {@link SessionStore} to support writing
@@ -34,7 +32,7 @@ class ChangeLoggingSessionBytesStore
     extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
     implements SessionStore<Bytes, byte[]> {
 
-    private StoreChangeLogger<Bytes, byte[]> changeLogger;
+    private InternalProcessorContext context;
 
     ChangeLoggingSessionBytesStore(final SessionStore<Bytes, byte[]> bytesStore) {
         super(bytesStore);
@@ -43,16 +41,9 @@ class ChangeLoggingSessionBytesStore
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         super.init(context, root);
-        final String topic = ProcessorStateManager.storeChangelogTopic(
-                context.applicationId(),
-                name());
-        changeLogger = new StoreChangeLogger<>(
-                name(),
-                context,
-                new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+        this.context = (InternalProcessorContext) context;
     }
 
-
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) {
         return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
@@ -66,14 +57,13 @@ class ChangeLoggingSessionBytesStore
     @Override
     public void remove(final Windowed<Bytes> sessionKey) {
         wrapped().remove(sessionKey);
-        changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null);
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp());
     }
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
         wrapped().put(sessionKey, aggregate);
-        changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), aggregate);
-
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, context.timestamp());
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index 02e4c6a..7cdac97 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -32,9 +32,9 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
     void log(final Bytes key,
              final byte[] valueAndTimestamp) {
         if (valueAndTimestamp != null) {
-            changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
+            context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
         } else {
-            changeLogger.logChange(key, null);
+            context.logChange(name(), key, null, context.timestamp());
         }
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 94362d4..3714150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -33,9 +33,9 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS
     void log(final Bytes key,
              final byte[] valueAndTimestamp) {
         if (valueAndTimestamp != null) {
-            changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
+            context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
         } else {
-            changeLogger.logChange(key, null);
+            context.logChange(name(), key, null, context.timestamp());
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 8a9b91a..a04eb2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,14 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -36,11 +34,9 @@ class ChangeLoggingWindowBytesStore
     implements WindowStore<Bytes, byte[]> {
 
     private final boolean retainDuplicates;
-    private ProcessorContext context;
+    InternalProcessorContext context;
     private int seqnum = 0;
 
-    StoreChangeLogger<Bytes, byte[]> changeLogger;
-
     ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
                                   final boolean retainDuplicates) {
         super(bytesStore);
@@ -50,13 +46,8 @@ class ChangeLoggingWindowBytesStore
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = (InternalProcessorContext) context;
         super.init(context, root);
-        final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
-        changeLogger = new StoreChangeLogger<>(
-            name(),
-            context,
-            new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
     }
 
     @Override
@@ -114,7 +105,7 @@ class ChangeLoggingWindowBytesStore
 
     void log(final Bytes key,
              final byte[] value) {
-        changeLogger.logChange(key, value);
+        context.logChange(name(), key, value, context.timestamp());
     }
 
     private int maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
deleted file mode 100644
index 7358120..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.StateSerdes;
-
-/**
- * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
- * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
- * i.e. use {@code StoreChangeLogger<Bytes, ...>} rather than {@code StoreChangeLogger<byte[], ...>}.
- *
- * @param <K>
- * @param <V>
- */
-class StoreChangeLogger<K, V> {
-
-    private final String topic;
-    private final int partition;
-    private final ProcessorContext context;
-    private final RecordCollector collector;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
-
-    StoreChangeLogger(final String storeName,
-                      final ProcessorContext context,
-                      final StateSerdes<K, V> serialization) {
-        this(storeName, context, context.taskId().partition, serialization);
-    }
-
-    private StoreChangeLogger(final String storeName,
-                              final ProcessorContext context,
-                              final int partition,
-                              final StateSerdes<K, V> serialization) {
-        topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
-        this.context = context;
-        this.partition = partition;
-        this.collector = ((RecordCollector.Supplier) context).recordCollector();
-        keySerializer = serialization.keySerializer();
-        valueSerializer = serialization.valueSerializer();
-    }
-
-    void logChange(final K key,
-                   final V value) {
-        logChange(key, value, context.timestamp());
-    }
-
-    void logChange(final K key,
-                   final V value,
-                   final long timestamp) {
-        // Sending null headers to changelog topics (KIP-244)
-        collector.send(topic, key, value, null, partition, timestamp, keySerializer, valueSerializer);
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 4a3ee7b..72b415f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.Cancellable;
@@ -180,7 +181,7 @@ public class AbstractProcessorContextTest {
     public void appConfigsShouldReturnUnrecognizedValues() {
         assertThat(
             context.appConfigs().get("user.supplied.config"),
-            equalTo("user-suppplied-value"));
+            equalTo("user-supplied-value"));
     }
 
 
@@ -190,7 +191,7 @@ public class AbstractProcessorContextTest {
             config = getStreamsConfig();
             // Value must be a string to test className -> class conversion
             config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigSetter.class.getName());
-            config.put("user.supplied.config", "user-suppplied-value");
+            config.put("user.supplied.config", "user-supplied-value");
         }
 
         TestProcessorContext(final MockStreamsMetrics metrics) {
@@ -233,5 +234,12 @@ public class AbstractProcessorContextTest {
 
         @Override
         public void commit() {}
+
+        @Override
+        public void logChange(final String storeName,
+                              final Bytes key,
+                              final byte[] value,
+                              final long timestamp) {
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 8443e55..e4fe6ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -72,6 +73,7 @@ public class GlobalProcessorContextImplTest {
         expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class));
         expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class));
         expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
+        expect(stateManager.taskType()).andStubReturn(TaskType.GLOBAL);
         replay(stateManager);
 
         globalContext = new GlobalProcessorContextImpl(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 5b52e9d..41cfdfa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -16,13 +16,18 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.time.Duration;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -33,6 +38,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,6 +49,8 @@ import java.util.List;
 import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTES_KEY_SERIALIZER;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER;
 import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
@@ -50,13 +58,19 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ProcessorContextImplTest {
     private ProcessorContextImpl context;
 
+    private final StreamsConfig streamsConfig = streamsConfigMock();
+
+    private RecordCollector recordCollector = mock(RecordCollector.class);
+
     private static final String KEY = "key";
     private static final long VALUE = 42L;
     private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(42L, 21L);
@@ -99,13 +113,8 @@ public class ProcessorContextImplTest {
             timestampedIters.add(i, mock(KeyValueIterator.class));
         }
 
-        final StreamsConfig streamsConfig = mock(StreamsConfig.class);
-        expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
-        expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
-        expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
-        replay(streamsConfig);
-
         final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+        expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
 
         expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
         expect(stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock());
@@ -125,7 +134,7 @@ public class ProcessorContextImplTest {
             mock(TaskId.class),
             mock(StreamTask.class),
             streamsConfig,
-            mock(RecordCollector.class),
+            recordCollector,
             stateManager,
             mock(StreamsMetricsImpl.class),
             mock(ThreadCache.class)
@@ -140,6 +149,18 @@ public class ProcessorContextImplTest {
                 "LocalSessionStore"))));
     }
 
+    private ProcessorContextImpl getStandbyContext() {
+        final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class);
+        expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
+        replay(stateManager);
+        return new ProcessorContextImpl(
+            mock(TaskId.class),
+            streamsConfig,
+            stateManager,
+            mock(StreamsMetricsImpl.class)
+        );
+    }
+
     @Test
     public void globalKeyValueStoreShouldBeReadOnly() {
         doTest("GlobalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
@@ -347,6 +368,165 @@ public class ProcessorContextImplTest {
         });
     }
 
+    @Test
+    public void shouldNotSendRecordHeadersToChangelogTopic() {
+        final Bytes key = Bytes.wrap("key".getBytes());
+        final byte[] value = "zero".getBytes();
+
+        recordCollector.send(null, key, value, null, 0, 42L, BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER);
+
+        replay(recordCollector);
+        context.logChange("Store", key, value, 42L);
+
+        verify(recordCollector);
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnLogChange() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.logChange("Store", Bytes.wrap("k".getBytes()), null, 0L)
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.getStateStore("store")
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnForward() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.forward("key", "value")
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnForwardWithChildIndex() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.forward("key", "value", 0)
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnForwardWithChildName() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.forward("key", "value", "child-name")
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.forward("key", "value", To.child("child-name"))
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnCommit() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.commit()
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnScheduleWithInterval() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.schedule(100L, PunctuationType.STREAM_TIME, t -> { })
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnSchedule() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, t -> { })
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnTopic() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.topic()
+        );
+    }
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnPartition() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.partition()
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnOffset() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.offset()
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.timestamp()
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.currentNode()
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.setRecordContext(mock(ProcessorRecordContext.class))
+        );
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {
+        context = getStandbyContext();
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> context.recordContext()
+        );
+    }
+
     @SuppressWarnings("unchecked")
     private KeyValueStore<String, Long> keyValueStoreMock() {
         final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class);
@@ -511,6 +691,15 @@ public class ProcessorContextImplTest {
         return sessionStore;
     }
 
+    private StreamsConfig streamsConfigMock() {
+        final StreamsConfig streamsConfig = mock(StreamsConfig.class);
+        expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andStubReturn("add-id");
+        expect(streamsConfig.defaultValueSerde()).andStubReturn(Serdes.ByteArray());
+        expect(streamsConfig.defaultKeySerde()).andStubReturn(Serdes.ByteArray());
+        replay(streamsConfig);
+        return streamsConfig;
+    }
+
     private void initStateStoreMock(final StateStore stateStore) {
         expect(stateStore.name()).andReturn(STORE_NAME);
         expect(stateStore.persistent()).andReturn(true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
index 45e0165..44f01b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.junit.Before;
@@ -43,14 +44,18 @@ public class ProcessorContextTest {
         expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
         expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
         expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
-        replay(streamsConfig);
+
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+        expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
+
+        replay(streamsConfig, stateManager);
 
         context = new ProcessorContextImpl(
             mock(TaskId.class),
             mock(StreamTask.class),
             streamsConfig,
             mock(RecordCollector.class),
-            mock(ProcessorStateManager.class),
+            stateManager,
             mock(StreamsMetricsImpl.class),
             mock(ThreadCache.class)
         );
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index c993d1d..7c14f23 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -144,7 +145,9 @@ public class StandbyTaskTest {
     public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
         stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
         EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
-        EasyMock.replay(stateDirectory);
+        EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
+
+        EasyMock.replay(stateDirectory, stateManager);
 
         task = createStandbyTask();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index bf9abee..cf1eafd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
 
 import java.io.File;
 import java.util.Map;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 
 public class StateManagerStub implements StateManager {
 
@@ -59,4 +60,9 @@ public class StateManagerStub implements StateManager {
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {}
 
+    @Override
+    public TaskType taskType() {
+        return null;
+    }
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 89740c3..7a39121 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
             final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics());
 
             @Override
-            public ThreadCache getCache() {
+            public ThreadCache cache() {
                 return cache;
             }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 79d66bf..5ab035c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -31,12 +31,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
 @RunWith(EasyMockRunner.class)
 public class ChangeLoggingSessionBytesStoreTest {
 
@@ -75,13 +69,15 @@ public class ChangeLoggingSessionBytesStoreTest {
 
         init();
 
-        store.put(key1, value1);
+        final Bytes binaryKey = SessionKeySchema.toBinary(key1);
 
-        assertThat(collector.collected().size(), equalTo(1));
-        assertThat(collector.collected().get(0).key(), equalTo(SessionKeySchema.toBinary(key1)));
-        assertThat(collector.collected().get(0).value(), equalTo(value1));
+        EasyMock.reset(context);
+        context.logChange(store.name(), binaryKey, value1, 0L);
 
-        EasyMock.verify(inner);
+        EasyMock.replay(context);
+        store.put(key1, value1);
+
+        EasyMock.verify(inner, context);
     }
 
     @Test
@@ -93,11 +89,14 @@ public class ChangeLoggingSessionBytesStoreTest {
         store.remove(key1);
 
         final Bytes binaryKey = SessionKeySchema.toBinary(key1);
-        assertThat(collector.collected().size(), equalTo(1));
-        assertThat(collector.collected().get(0).key(), equalTo(binaryKey));
-        assertThat(collector.collected().get(0).value(), nullValue());
 
-        EasyMock.verify(inner);
+        EasyMock.reset(context);
+        context.logChange(store.name(), binaryKey, null, 0L);
+
+        EasyMock.replay(context);
+        store.remove(key1);
+
+        EasyMock.verify(inner, context);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index bde6d05..4a240b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -31,9 +31,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import static java.time.Instant.ofEpochMilli;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 
 @RunWith(EasyMockRunner.class)
 public class ChangeLoggingTimestampedWindowBytesStoreTest {
@@ -75,15 +72,15 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
 
         init();
 
-        store.put(bytesKey, valueAndTimestamp);
-
         final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
-        assertThat(collector.collected().size(), equalTo(1));
-        assertThat(collector.collected().get(0).key(), equalTo(key));
-        assertThat(collector.collected().get(0).value(), equalTo(value));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(42L));
 
-        EasyMock.verify(inner);
+        EasyMock.reset(context);
+        context.logChange(store.name(), key, value, 42);
+
+        EasyMock.replay(context);
+        store.put(bytesKey, valueAndTimestamp);
+
+        EasyMock.verify(inner, context);
     }
 
     @Test
@@ -118,20 +115,20 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, valueAndTimestamp);
-        store.put(bytesKey, valueAndTimestamp);
 
         final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
         final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
-        assertThat(collector.collected().size(), equalTo(2));
-        assertThat(collector.collected().get(0).key(), equalTo(key1));
-        assertThat(collector.collected().get(0).value(), equalTo(value));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(42L));
-        assertThat(collector.collected().get(1).key(), equalTo(key2));
-        assertThat(collector.collected().get(1).value(), equalTo(value));
-        assertThat(collector.collected().get(1).timestamp(), equalTo(42L));
 
-        EasyMock.verify(inner);
+        EasyMock.reset(context);
+        context.logChange(store.name(), key1, value, 42L);
+        context.logChange(store.name(), key2, value, 42L);
+
+        EasyMock.replay(context);
+
+        store.put(bytesKey, valueAndTimestamp);
+        store.put(bytesKey, valueAndTimestamp);
+
+        EasyMock.verify(inner, context);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index ce60548..f4cb523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -31,9 +31,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import static java.time.Instant.ofEpochMilli;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
 
 @RunWith(EasyMockRunner.class)
 public class ChangeLoggingWindowBytesStoreTest {
@@ -74,15 +71,16 @@ public class ChangeLoggingWindowBytesStoreTest {
 
         init();
 
-        store.put(bytesKey, value);
-
         final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
-        assertThat(collector.collected().size(), equalTo(1));
-        assertThat(collector.collected().get(0).key(), equalTo(key));
-        assertThat(collector.collected().get(0).value(), equalTo(value));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(0L));
 
-        EasyMock.verify(inner);
+        EasyMock.reset(context);
+        EasyMock.expect(context.timestamp()).andStubReturn(0L);
+        context.logChange(store.name(), key, value, 0L);
+
+        EasyMock.replay(context);
+        store.put(bytesKey, value);
+
+        EasyMock.verify(inner, context);
     }
 
     @Test
@@ -113,24 +111,26 @@ public class ChangeLoggingWindowBytesStoreTest {
     @SuppressWarnings("deprecation")
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true);
+
         inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, value);
-        store.put(bytesKey, value);
 
         final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
         final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
-        assertThat(collector.collected().size(), equalTo(2));
-        assertThat(collector.collected().get(0).key(), equalTo(key1));
-        assertThat(collector.collected().get(0).value(), equalTo(value));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(0L));
-        assertThat(collector.collected().get(1).key(), equalTo(key2));
-        assertThat(collector.collected().get(1).value(), equalTo(value));
-        assertThat(collector.collected().get(1).timestamp(), equalTo(0L));
 
-        EasyMock.verify(inner);
+        EasyMock.reset(context);
+        EasyMock.expect(context.timestamp()).andStubReturn(0L);
+        context.logChange(store.name(), key1, value, 0L);
+        context.logChange(store.name(), key2, value, 0L);
+
+        EasyMock.replay(context);
+
+        store.put(bytesKey, value);
+        store.put(bytesKey, value);
+
+        EasyMock.verify(inner, context);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
deleted file mode 100644
index c5a89da..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
-public class StoreChangeLoggerTest {
-
-    private final String topic = "topic";
-
-    private final MockRecordCollector collector = new MockRecordCollector();
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(
-        StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
-        collector);
-
-    private final StoreChangeLogger<Integer, String> changeLogger =
-        new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
-
-    @Test
-    public void testAddRemove() {
-        context.setTime(1);
-        changeLogger.logChange(0, "zero");
-        context.setTime(5);
-        changeLogger.logChange(1, "one");
-        changeLogger.logChange(2, "two");
-        changeLogger.logChange(3, "three", 42L);
-        context.setTime(9);
-        changeLogger.logChange(0, null);
-
-        assertThat(collector.collected().size(), equalTo(5));
-        assertThat(collector.collected().get(0).key(), equalTo(0));
-        assertThat(collector.collected().get(0).value(), equalTo("zero"));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(1L));
-        assertThat(collector.collected().get(1).key(), equalTo(1));
-        assertThat(collector.collected().get(1).value(), equalTo("one"));
-        assertThat(collector.collected().get(1).timestamp(), equalTo(5L));
-        assertThat(collector.collected().get(2).key(), equalTo(2));
-        assertThat(collector.collected().get(2).value(), equalTo("two"));
-        assertThat(collector.collected().get(2).timestamp(), equalTo(5L));
-        assertThat(collector.collected().get(3).key(), equalTo(3));
-        assertThat(collector.collected().get(3).value(), equalTo("three"));
-        assertThat(collector.collected().get(3).timestamp(), equalTo(42L));
-        assertThat(collector.collected().get(4).key(), equalTo(0));
-        assertThat(collector.collected().get(4).value(), nullValue());
-        assertThat(collector.collected().get(4).timestamp(), equalTo(9L));
-    }
-
-    @Test
-    public void shouldNotSendRecordHeadersToChangelogTopic() {
-        context.headers().add(new RecordHeader("key", "value".getBytes()));
-        changeLogger.logChange(0, "zero", 42L);
-
-        assertThat(collector.collected().size(), equalTo(1));
-        assertThat(collector.collected().get(0).key(), equalTo(0));
-        assertThat(collector.collected().get(0).value(), equalTo("zero"));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(42L));
-        assertThat(collector.collected().get(0).headers().toArray(), equalTo(Record.EMPTY_HEADERS));
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index 3def253..ae825bc 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import java.io.File;
 import java.util.Map;
 import java.util.Set;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 
 public class GlobalStateManagerStub implements GlobalStateManager {
 
@@ -82,4 +83,9 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     public Map<TopicPartition, Long> changelogOffsets() {
         return offsets;
     }
+
+    @Override
+    public TaskType taskType() {
+        return TaskType.GLOBAL;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 8cd784f..81876e4 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -38,6 +39,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -349,6 +351,27 @@ public class InternalMockProcessorContext
         return recordContext.headers();
     }
 
+    @Override
+    public TaskType taskType() {
+        return TaskType.ACTIVE;
+    }
+
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+        recordCollector().send(
+            storeName + "-changelog",
+            key,
+            value,
+            null,
+            taskId().partition,
+            timestamp,
+            BYTES_KEY_SERIALIZER,
+            BYTEARRAY_VALUE_SERIALIZER);
+    }
+
     public StateRestoreListener getRestoreListener(final String storeName) {
         return getStateRestoreListener(restoreFuncs.get(storeName));
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 4b7cf49..e375085 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -38,6 +40,7 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     private ProcessorNode currentNode;
     private RecordCollector recordCollector;
     private long currentSystemTimeMs;
+    private TaskType taskType = TaskType.ACTIVE;
 
     public MockInternalProcessorContext() {
     }
@@ -88,7 +91,7 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     }
 
     @Override
-    public ThreadCache getCache() {
+    public ThreadCache cache() {
         return null;
     }
 
@@ -116,4 +119,16 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     public StateRestoreCallback stateRestoreCallback(final String storeName) {
         return restoreCallbacks.get(storeName);
     }
+
+    @Override
+    public TaskType taskType() {
+        return taskType;
+    }
+
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 77dd418..da8b7b4 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -32,6 +33,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 
 public class NoOpProcessorContext extends AbstractProcessorContext {
     public boolean initialized;
@@ -101,5 +103,18 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
 
     @Override
     public void register(final StateStore store,
-                         final StateRestoreCallback stateRestoreCallback) {}
+                         final StateRestoreCallback stateRestoreCallback) {
+    }
+
+    @Override
+    public TaskType taskType() {
+        return TaskType.ACTIVE;
+    }
+
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+    }
 }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e78f966..8475172 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -991,7 +991,7 @@ public class TopologyTestDriver implements Closeable {
     private StateStore getStateStore(final String name,
                                      final boolean throwForBuiltInStores) {
         if (task != null) {
-            final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+            final StateStore stateStore = ((ProcessorContextImpl) task.context()).stateManager().getStore(name);
             if (stateStore != null) {
                 if (throwForBuiltInStores) {
                     throwIfBuiltInStore(stateStore);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 73da6ef..b16eb32 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -75,6 +75,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     private final List<CapturedForward> capturedForwards = new LinkedList<>();
     private boolean committed = false;
 
+
     /**
      * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
      */