You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/11 09:44:28 UTC

[kafka] branch trunk updated: KAFKA-6970: All standard state stores guarded with read only wrapper (#6016)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c142809  KAFKA-6970: All standard state stores guarded with read only wrapper (#6016)
c142809 is described below

commit c142809038e67625638165462ceefd3f1191e0e9
Author: Nikolay <ni...@apache.org>
AuthorDate: Tue Dec 11 12:44:18 2018 +0300

    KAFKA-6970: All standard state stores guarded with read only wrapper (#6016)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../streams/kstream/internals/TupleForwarder.java  |  15 +-
 .../processor/internals/ProcessorContextImpl.java  | 232 +++++++++++++++++----
 .../streams/state/internals/WrappedStateStore.java |   2 +-
 .../internals/ProcessorContextImplTest.java        | 225 ++++++++++++++++----
 .../processor/internals/ProcessorTopologyTest.java |   5 -
 5 files changed, 385 insertions(+), 94 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index aec0d16..127057f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -48,9 +48,18 @@ class TupleForwarder<K, V> {
     private CachedStateStore cachedStateStore(final StateStore store) {
         if (store instanceof CachedStateStore) {
             return (CachedStateStore) store;
-        } else if (store instanceof WrappedStateStore
-                && ((WrappedStateStore) store).wrappedStore() instanceof CachedStateStore) {
-            return (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
+        } else if (store instanceof WrappedStateStore) {
+            StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+
+            while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) {
+                wrapped = ((WrappedStateStore) wrapped).wrappedStore();
+            }
+
+            if (!(wrapped instanceof CachedStateStore)) {
+                return null;
+            }
+
+            return (CachedStateStore) wrapped;
         }
         return null;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c79ec35..e7dd4db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
@@ -37,6 +38,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
 import java.util.List;
+import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
@@ -102,7 +104,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                     "please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
         }
 
-        return stateManager.getStore(name);
+        final StateStore store = stateManager.getStore(name);
+        if (store instanceof KeyValueStore) {
+            return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+        } else if (store instanceof WindowStore) {
+            return new WindowStoreReadWriteDecorator((WindowStore) store);
+        } else if (store instanceof SessionStore) {
+            return new SessionStoreReadWriteDecorator((SessionStore) store);
+        }
+
+        return store;
     }
 
     @SuppressWarnings("unchecked")
@@ -196,23 +207,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         return streamTimeSupplier.get();
     }
 
-    private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> implements StateStore {
+    private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends AbstractStateStore {
         static final String ERROR_MESSAGE = "Global store is read only";
 
-        final T underlying;
-
-        StateStoreReadOnlyDecorator(final T underlying) {
-            this.underlying = underlying;
-        }
-
-        @Override
-        public String name() {
-            return underlying.name();
+        StateStoreReadOnlyDecorator(final T inner) {
+            super(inner);
         }
 
-        @Override
-        public void init(final ProcessorContext context, final StateStore root) {
-            underlying.init(context, root);
+        @SuppressWarnings("unchecked")
+        T getInner() {
+            return (T) wrappedStore();
         }
 
         @Override
@@ -221,44 +225,39 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public void close() {
-            underlying.close();
-        }
-
-        @Override
-        public boolean persistent() {
-            return underlying.persistent();
+        public void init(final ProcessorContext context, final StateStore root) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
         @Override
-        public boolean isOpen() {
-            return underlying.isOpen();
+        public void close() {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
     }
 
     private static class KeyValueStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
-        KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> underlying) {
-            super(underlying);
+        KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
+            super(inner);
         }
 
         @Override
         public V get(final K key) {
-            return underlying.get(key);
+            return getInner().get(key);
         }
 
         @Override
         public KeyValueIterator<K, V> range(final K from, final K to) {
-            return underlying.range(from, to);
+            return getInner().range(from, to);
         }
 
         @Override
         public KeyValueIterator<K, V> all() {
-            return underlying.all();
+            return getInner().all();
         }
 
         @Override
         public long approximateNumEntries() {
-            return underlying.approximateNumEntries();
+            return getInner().approximateNumEntries();
         }
 
         @Override
@@ -283,8 +282,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     }
 
     private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
-        WindowStoreReadOnlyDecorator(final WindowStore<K, V> underlying) {
-            super(underlying);
+        WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
+            super(inner);
         }
 
         @Override
@@ -299,46 +298,46 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         @Override
         public V fetch(final K key, final long time) {
-            return underlying.fetch(key, time);
+            return getInner().fetch(key, time);
         }
 
         @Deprecated
         @Override
         public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
-            return underlying.fetch(key, timeFrom, timeTo);
+            return getInner().fetch(key, timeFrom, timeTo);
         }
 
         @Deprecated
         @Override
         public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
-            return underlying.fetch(from, to, timeFrom, timeTo);
+            return getInner().fetch(from, to, timeFrom, timeTo);
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, V> all() {
-            return underlying.all();
+            return getInner().all();
         }
 
         @Deprecated
         @Override
         public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
-            return underlying.fetchAll(timeFrom, timeTo);
+            return getInner().fetchAll(timeFrom, timeTo);
         }
     }
 
     private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
-        SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> underlying) {
-            super(underlying);
+        SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
+            super(inner);
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
-            return underlying.findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+            return getInner().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
-            return underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+            return getInner().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
         }
 
         @Override
@@ -353,12 +352,161 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         @Override
         public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
-            return underlying.fetch(key);
+            return getInner().fetch(key);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
+            return getInner().fetch(from, to);
+        }
+    }
+
+    private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends AbstractStateStore {
+        static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
+
+        StateStoreReadWriteDecorator(final T inner) {
+            super(inner);
+        }
+
+        @SuppressWarnings("unchecked")
+        T wrapped() {
+            return (T) super.wrappedStore();
+        }
+
+        @Override
+        public void init(final ProcessorContext context, final StateStore root) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void close() {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+    }
+
+    private static class KeyValueStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
+        KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Override
+        public V get(final K key) {
+            return wrapped().get(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(final K from, final K to) {
+            return wrapped().range(from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return wrapped().all();
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return wrapped().approximateNumEntries();
+        }
+
+        @Override
+        public void put(final K key, final V value) {
+            wrapped().put(key, value);
+        }
+
+        @Override
+        public V putIfAbsent(final K key, final V value) {
+            return wrapped().putIfAbsent(key, value);
+        }
+
+        @Override
+        public void putAll(final List<KeyValue<K, V>> entries) {
+            wrapped().putAll(entries);
+        }
+
+        @Override
+        public V delete(final K key) {
+            return wrapped().delete(key);
+        }
+    }
+
+    private static class WindowStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
+        WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+            super(inner);
+        }
+
+        @Override
+        public void put(final K key, final V value) {
+            wrapped().put(key, value);
+        }
+
+        @Override
+        public void put(final K key, final V value, final long windowStartTimestamp) {
+            wrapped().put(key, value, windowStartTimestamp);
+        }
+
+        @Override
+        public V fetch(final K key, final long time) {
+            return wrapped().fetch(key, time);
+        }
+
+        @Deprecated
+        @Override
+        public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+            return wrapped().fetch(key, timeFrom, timeTo);
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+            return wrapped().fetch(from, to, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> all() {
+            return wrapped().all();
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+            return wrapped().fetchAll(timeFrom, timeTo);
+        }
+    }
+
+    private static class SessionStoreReadWriteDecorator<K, AGG> extends StateStoreReadWriteDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
+        SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+            super(inner);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
+            return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
+            return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public void remove(final Windowed<K> sessionKey) {
+            wrapped().remove(sessionKey);
+        }
+
+        @Override
+        public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+            wrapped().put(sessionKey, aggregate);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+            return wrapped().fetch(key);
         }
 
         @Override
         public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
-            return underlying.fetch(from, to);
+            return wrapped().fetch(from, to);
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 38f966e..570c465 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -41,7 +41,7 @@ public interface WrappedStateStore extends StateStore {
     abstract class AbstractStateStore implements WrappedStateStore {
         final StateStore innerState;
 
-        AbstractStateStore(final StateStore inner) {
+        protected AbstractStateStore(final StateStore inner) {
             this.innerState = inner;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index fa5f597..f956e0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.function.Consumer;
 
@@ -38,8 +39,9 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.junit.Before;
 import org.junit.Test;
 
-import static java.util.Collections.emptySet;
+import static java.util.Arrays.asList;
 import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
@@ -56,8 +58,14 @@ public class ProcessorContextImplTest {
     private static final long VAL = 42L;
     private static final String STORE_NAME = "underlying-store";
 
-    private boolean initExecuted;
-    private boolean closeExecuted;
+    private boolean flushExecuted;
+    private boolean putExecuted;
+    private boolean putIfAbsentExecuted;
+    private boolean putAllExecuted;
+    private boolean deleteExecuted;
+    private boolean removeExecuted;
+    private boolean put3argExecuted;
+
     private KeyValueIterator<String, Long> rangeIter;
     private KeyValueIterator<String, Long> allIter;
 
@@ -66,6 +74,14 @@ public class ProcessorContextImplTest {
 
     @Before
     public void setup() {
+        flushExecuted = false;
+        putExecuted = false;
+        putIfAbsentExecuted = false;
+        putAllExecuted = false;
+        deleteExecuted = false;
+        removeExecuted = false;
+        put3argExecuted = false;
+
         rangeIter = mock(KeyValueIterator.class);
         allIter = mock(KeyValueIterator.class);
         windowStoreIter = mock(WindowStoreIterator.class);
@@ -82,9 +98,14 @@ public class ProcessorContextImplTest {
 
         final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
-        expect(stateManager.getGlobalStore("KeyValueStore")).andReturn(keyValueStoreMock());
-        expect(stateManager.getGlobalStore("WindowStore")).andReturn(windowStoreMock());
-        expect(stateManager.getGlobalStore("SessionStore")).andReturn(sessionStoreMock());
+        expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
+        expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock());
+        expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock());
+        expect(stateManager.getGlobalStore(anyString())).andReturn(null);
+
+        expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock());
+        expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock());
+        expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock());
 
         replay(stateManager);
 
@@ -98,16 +119,20 @@ public class ProcessorContextImplTest {
             mock(ThreadCache.class)
         );
 
-        context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, emptySet()));
+        context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
+            new HashSet<>(asList("LocalKeyValueStore", "LocalWindowStore", "LocalSessionStore"))));
     }
 
     @Test
-    public void testKeyValueStore() {
-        doTest("KeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
-            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
-            checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent");
-            checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll");
-            checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete");
+    public void globalKeyValueStoreShouldBeReadOnly() {
+        doTest("GlobalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            checkThrowsUnsupportedOperation(store::flush, "flush()");
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()");
+            checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent()");
+            checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()");
+            checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()");
 
             assertEquals((Long) VAL, store.get(KEY));
             assertEquals(rangeIter, store.range("one", "two"));
@@ -117,10 +142,13 @@ public class ProcessorContextImplTest {
     }
 
     @Test
-    public void testWindowStore() {
-        doTest("WindowStore", (Consumer<WindowStore<String, Long>>) store -> {
-            checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put");
-            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
+    public void globalWindowStoreShouldBeReadOnly() {
+        doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            checkThrowsUnsupportedOperation(store::flush, "flush()");
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put()");
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()");
 
             assertEquals(iters.get(0), store.fetchAll(0L, 0L));
             assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@@ -131,10 +159,13 @@ public class ProcessorContextImplTest {
     }
 
     @Test
-    public void testSessionStore() {
-        doTest("SessionStore", (Consumer<SessionStore<String, Long>>) store -> {
-            checkThrowsUnsupportedOperation(() -> store.remove(null), "remove");
-            checkThrowsUnsupportedOperation(() -> store.put(null, null), "put");
+    public void globalSessionStoreShouldBeReadOnly() {
+        doTest("GlobalSessionStore", (Consumer<SessionStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            checkThrowsUnsupportedOperation(store::flush, "flush()");
+            checkThrowsUnsupportedOperation(() -> store.remove(null), "remove()");
+            checkThrowsUnsupportedOperation(() -> store.put(null, null), "put()");
 
             assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L));
             assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L));
@@ -143,6 +174,77 @@ public class ProcessorContextImplTest {
         });
     }
 
+    @Test
+    public void localKeyValueStoreShouldNotAllowInitOrClose() {
+        doTest("LocalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            store.flush();
+            assertTrue(flushExecuted);
+
+            store.put("1", 1L);
+            assertTrue(putExecuted);
+
+            store.putIfAbsent("1", 1L);
+            assertTrue(putIfAbsentExecuted);
+
+            store.putAll(Collections.emptyList());
+            assertTrue(putAllExecuted);
+
+            store.delete("1");
+            assertTrue(deleteExecuted);
+
+            assertEquals((Long) VAL, store.get(KEY));
+            assertEquals(rangeIter, store.range("one", "two"));
+            assertEquals(allIter, store.all());
+            assertEquals(VAL, store.approximateNumEntries());
+        });
+    }
+
+    @Test
+    public void localWindowStoreShouldNotAllowInitOrClose() {
+        doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            store.flush();
+            assertTrue(flushExecuted);
+
+            store.put("1", 1L);
+            assertTrue(putExecuted);
+
+            store.put("1", 1L, 1L);
+            assertTrue(put3argExecuted);
+
+            assertEquals(iters.get(0), store.fetchAll(0L, 0L));
+            assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
+            assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
+            assertEquals((Long) VAL, store.fetch(KEY, 1L));
+            assertEquals(iters.get(2), store.all());
+        });
+    }
+
+    @Test
+    public void localSessionStoreShouldNotAllowInitOrClose() {
+        doTest("LocalSessionStore", (Consumer<SessionStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            store.flush();
+            assertTrue(flushExecuted);
+
+            store.remove(null);
+            assertTrue(removeExecuted);
+
+            store.put(null, null);
+            assertTrue(putExecuted);
+
+            assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L));
+            assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L));
+            assertEquals(iters.get(5), store.fetch(KEY));
+            assertEquals(iters.get(6), store.fetch(KEY, KEY));
+        });
+    }
+
+    @SuppressWarnings("unchecked")
     private KeyValueStore<String, Long> keyValueStoreMock() {
         final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class);
 
@@ -154,6 +256,31 @@ public class ProcessorContextImplTest {
         expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter);
         expect(keyValueStoreMock.all()).andReturn(allIter);
 
+
+        keyValueStoreMock.put(anyString(), anyLong());
+        expectLastCall().andAnswer(() -> {
+            putExecuted = true;
+            return null;
+        });
+
+        keyValueStoreMock.putIfAbsent(anyString(), anyLong());
+        expectLastCall().andAnswer(() -> {
+            putIfAbsentExecuted = true;
+            return null;
+        });
+
+        keyValueStoreMock.putAll(anyObject(List.class));
+        expectLastCall().andAnswer(() -> {
+            putAllExecuted = true;
+            return null;
+        });
+
+        keyValueStoreMock.delete(anyString());
+        expectLastCall().andAnswer(() -> {
+            deleteExecuted = true;
+            return null;
+        });
+
         replay(keyValueStoreMock);
 
         return keyValueStoreMock;
@@ -170,11 +297,24 @@ public class ProcessorContextImplTest {
         expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL);
         expect(windowStore.all()).andReturn(iters.get(2));
 
+        windowStore.put(anyString(), anyLong());
+        expectLastCall().andAnswer(() -> {
+            putExecuted = true;
+            return null;
+        });
+
+        windowStore.put(anyString(), anyLong(), anyLong());
+        expectLastCall().andAnswer(() -> {
+            put3argExecuted = true;
+            return null;
+        });
+
         replay(windowStore);
 
         return windowStore;
     }
 
+    @SuppressWarnings("unchecked")
     private SessionStore<String, Long> sessionStoreMock() {
         final SessionStore<String, Long> sessionStore = mock(SessionStore.class);
 
@@ -185,25 +325,31 @@ public class ProcessorContextImplTest {
         expect(sessionStore.fetch(anyString())).andReturn(iters.get(5));
         expect(sessionStore.fetch(anyString(), anyString())).andReturn(iters.get(6));
 
+        sessionStore.put(anyObject(Windowed.class), anyLong());
+        expectLastCall().andAnswer(() -> {
+            putExecuted = true;
+            return null;
+        });
+
+        sessionStore.remove(anyObject(Windowed.class));
+        expectLastCall().andAnswer(() -> {
+            removeExecuted = true;
+            return null;
+        });
+
         replay(sessionStore);
 
         return sessionStore;
     }
 
-    private void initStateStoreMock(final StateStore windowStore) {
-        expect(windowStore.name()).andReturn(STORE_NAME);
-        expect(windowStore.persistent()).andReturn(true);
-        expect(windowStore.isOpen()).andReturn(true);
-
-        windowStore.init(null, null);
-        expectLastCall().andAnswer(() -> {
-            initExecuted = true;
-            return null;
-        });
+    private void initStateStoreMock(final StateStore stateStore) {
+        expect(stateStore.name()).andReturn(STORE_NAME);
+        expect(stateStore.persistent()).andReturn(true);
+        expect(stateStore.isOpen()).andReturn(true);
 
-        windowStore.close();
+        stateStore.flush();
         expectLastCall().andAnswer(() -> {
-            closeExecuted = true;
+            flushExecuted = true;
             return null;
         });
     }
@@ -215,8 +361,6 @@ public class ProcessorContextImplTest {
             public void init(final ProcessorContext context) {
                 final T store = (T) context.getStateStore(name);
 
-                checkStateStoreMethods(store);
-
                 checker.accept(store);
 
             }
@@ -235,18 +379,13 @@ public class ProcessorContextImplTest {
         processor.init(context);
     }
 
-    private void checkStateStoreMethods(final StateStore store) {
-        checkThrowsUnsupportedOperation(store::flush, "flush");
-
+    private void verifyStoreCannotBeInitializedOrClosed(final StateStore store) {
         assertEquals(STORE_NAME, store.name());
         assertTrue(store.persistent());
         assertTrue(store.isOpen());
 
-        store.init(null, null);
-        assertTrue(initExecuted);
-
-        store.close();
-        assertTrue(closeExecuted);
+        checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+        checkThrowsUnsupportedOperation(store::close, "close()");
     }
 
     private void checkThrowsUnsupportedOperation(final Runnable check, final String name) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 11050fe..14b94da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -662,11 +662,6 @@ public class ProcessorTopologyTest {
         public void process(final String key, final String value) {
             store.put(key, value);
         }
-
-        @Override
-        public void close() {
-            store.close();
-        }
     }
 
     private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {