You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/11 09:44:28 UTC
[kafka] branch trunk updated: KAFKA-6970: All standard state stores
guarded with read only wrapper (#6016)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c142809 KAFKA-6970: All standard state stores guarded with read only wrapper (#6016)
c142809 is described below
commit c142809038e67625638165462ceefd3f1191e0e9
Author: Nikolay <ni...@apache.org>
AuthorDate: Tue Dec 11 12:44:18 2018 +0300
KAFKA-6970: All standard state stores guarded with read only wrapper (#6016)
Reviewer: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../streams/kstream/internals/TupleForwarder.java | 15 +-
.../processor/internals/ProcessorContextImpl.java | 232 +++++++++++++++++----
.../streams/state/internals/WrappedStateStore.java | 2 +-
.../internals/ProcessorContextImplTest.java | 225 ++++++++++++++++----
.../processor/internals/ProcessorTopologyTest.java | 5 -
5 files changed, 385 insertions(+), 94 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index aec0d16..127057f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -48,9 +48,18 @@ class TupleForwarder<K, V> {
private CachedStateStore cachedStateStore(final StateStore store) {
if (store instanceof CachedStateStore) {
return (CachedStateStore) store;
- } else if (store instanceof WrappedStateStore
- && ((WrappedStateStore) store).wrappedStore() instanceof CachedStateStore) {
- return (CachedStateStore) ((WrappedStateStore) store).wrappedStore();
+ } else if (store instanceof WrappedStateStore) {
+ StateStore wrapped = ((WrappedStateStore) store).wrappedStore();
+
+ while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) {
+ wrapped = ((WrappedStateStore) wrapped).wrappedStore();
+ }
+
+ if (!(wrapped instanceof CachedStateStore)) {
+ return null;
+ }
+
+ return (CachedStateStore) wrapped;
}
return null;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c79ec35..e7dd4db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
@@ -37,6 +38,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.time.Duration;
import java.util.List;
+import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
@@ -102,7 +104,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
"please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
}
- return stateManager.getStore(name);
+ final StateStore store = stateManager.getStore(name);
+ if (store instanceof KeyValueStore) {
+ return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+ } else if (store instanceof WindowStore) {
+ return new WindowStoreReadWriteDecorator((WindowStore) store);
+ } else if (store instanceof SessionStore) {
+ return new SessionStoreReadWriteDecorator((SessionStore) store);
+ }
+
+ return store;
}
@SuppressWarnings("unchecked")
@@ -196,23 +207,16 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return streamTimeSupplier.get();
}
- private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> implements StateStore {
+ private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends AbstractStateStore {
static final String ERROR_MESSAGE = "Global store is read only";
- final T underlying;
-
- StateStoreReadOnlyDecorator(final T underlying) {
- this.underlying = underlying;
- }
-
- @Override
- public String name() {
- return underlying.name();
+ StateStoreReadOnlyDecorator(final T inner) {
+ super(inner);
}
- @Override
- public void init(final ProcessorContext context, final StateStore root) {
- underlying.init(context, root);
+ @SuppressWarnings("unchecked")
+ T getInner() {
+ return (T) wrappedStore();
}
@Override
@@ -221,44 +225,39 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
- public void close() {
- underlying.close();
- }
-
- @Override
- public boolean persistent() {
- return underlying.persistent();
+ public void init(final ProcessorContext context, final StateStore root) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
- public boolean isOpen() {
- return underlying.isOpen();
+ public void close() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
}
private static class KeyValueStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
- KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> underlying) {
- super(underlying);
+ KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
+ super(inner);
}
@Override
public V get(final K key) {
- return underlying.get(key);
+ return getInner().get(key);
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
- return underlying.range(from, to);
+ return getInner().range(from, to);
}
@Override
public KeyValueIterator<K, V> all() {
- return underlying.all();
+ return getInner().all();
}
@Override
public long approximateNumEntries() {
- return underlying.approximateNumEntries();
+ return getInner().approximateNumEntries();
}
@Override
@@ -283,8 +282,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
- WindowStoreReadOnlyDecorator(final WindowStore<K, V> underlying) {
- super(underlying);
+ WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
+ super(inner);
}
@Override
@@ -299,46 +298,46 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
public V fetch(final K key, final long time) {
- return underlying.fetch(key, time);
+ return getInner().fetch(key, time);
}
@Deprecated
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
- return underlying.fetch(key, timeFrom, timeTo);
+ return getInner().fetch(key, timeFrom, timeTo);
}
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
- return underlying.fetch(from, to, timeFrom, timeTo);
+ return getInner().fetch(from, to, timeFrom, timeTo);
}
@Override
public KeyValueIterator<Windowed<K>, V> all() {
- return underlying.all();
+ return getInner().all();
}
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
- return underlying.fetchAll(timeFrom, timeTo);
+ return getInner().fetchAll(timeFrom, timeTo);
}
}
private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
- SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> underlying) {
- super(underlying);
+ SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
+ super(inner);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
- return underlying.findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+ return getInner().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
- return underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+ return getInner().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}
@Override
@@ -353,12 +352,161 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
- return underlying.fetch(key);
+ return getInner().fetch(key);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
+ return getInner().fetch(from, to);
+ }
+ }
+
+ private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends AbstractStateStore {
+ static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
+
+ StateStoreReadWriteDecorator(final T inner) {
+ super(inner);
+ }
+
+ @SuppressWarnings("unchecked")
+ T wrapped() {
+ return (T) super.wrappedStore();
+ }
+
+ @Override
+ public void init(final ProcessorContext context, final StateStore root) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ }
+
+ private static class KeyValueStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
+ KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+ super(inner);
+ }
+
+ @Override
+ public V get(final K key) {
+ return wrapped().get(key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(final K from, final K to) {
+ return wrapped().range(from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return wrapped().all();
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return wrapped().approximateNumEntries();
+ }
+
+ @Override
+ public void put(final K key, final V value) {
+ wrapped().put(key, value);
+ }
+
+ @Override
+ public V putIfAbsent(final K key, final V value) {
+ return wrapped().putIfAbsent(key, value);
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, V>> entries) {
+ wrapped().putAll(entries);
+ }
+
+ @Override
+ public V delete(final K key) {
+ return wrapped().delete(key);
+ }
+ }
+
+ private static class WindowStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
+ WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+ super(inner);
+ }
+
+ @Override
+ public void put(final K key, final V value) {
+ wrapped().put(key, value);
+ }
+
+ @Override
+ public void put(final K key, final V value, final long windowStartTimestamp) {
+ wrapped().put(key, value, windowStartTimestamp);
+ }
+
+ @Override
+ public V fetch(final K key, final long time) {
+ return wrapped().fetch(key, time);
+ }
+
+ @Deprecated
+ @Override
+ public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+ return wrapped().fetch(key, timeFrom, timeTo);
+ }
+
+ @Deprecated
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+ return wrapped().fetch(from, to, timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ return wrapped().all();
+ }
+
+ @Deprecated
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+ return wrapped().fetchAll(timeFrom, timeTo);
+ }
+ }
+
+ private static class SessionStoreReadWriteDecorator<K, AGG> extends StateStoreReadWriteDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
+ SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+ super(inner);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
+ return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
+ return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
+ wrapped().remove(sessionKey);
+ }
+
+ @Override
+ public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+ wrapped().put(sessionKey, aggregate);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+ return wrapped().fetch(key);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
- return underlying.fetch(from, to);
+ return wrapped().fetch(from, to);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 38f966e..570c465 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -41,7 +41,7 @@ public interface WrappedStateStore extends StateStore {
abstract class AbstractStateStore implements WrappedStateStore {
final StateStore innerState;
- AbstractStateStore(final StateStore inner) {
+ protected AbstractStateStore(final StateStore inner) {
this.innerState = inner;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index fa5f597..f956e0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.function.Consumer;
@@ -38,8 +39,9 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.Before;
import org.junit.Test;
-import static java.util.Collections.emptySet;
+import static java.util.Arrays.asList;
import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
@@ -56,8 +58,14 @@ public class ProcessorContextImplTest {
private static final long VAL = 42L;
private static final String STORE_NAME = "underlying-store";
- private boolean initExecuted;
- private boolean closeExecuted;
+ private boolean flushExecuted;
+ private boolean putExecuted;
+ private boolean putIfAbsentExecuted;
+ private boolean putAllExecuted;
+ private boolean deleteExecuted;
+ private boolean removeExecuted;
+ private boolean put3argExecuted;
+
private KeyValueIterator<String, Long> rangeIter;
private KeyValueIterator<String, Long> allIter;
@@ -66,6 +74,14 @@ public class ProcessorContextImplTest {
@Before
public void setup() {
+ flushExecuted = false;
+ putExecuted = false;
+ putIfAbsentExecuted = false;
+ putAllExecuted = false;
+ deleteExecuted = false;
+ removeExecuted = false;
+ put3argExecuted = false;
+
rangeIter = mock(KeyValueIterator.class);
allIter = mock(KeyValueIterator.class);
windowStoreIter = mock(WindowStoreIterator.class);
@@ -82,9 +98,14 @@ public class ProcessorContextImplTest {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
- expect(stateManager.getGlobalStore("KeyValueStore")).andReturn(keyValueStoreMock());
- expect(stateManager.getGlobalStore("WindowStore")).andReturn(windowStoreMock());
- expect(stateManager.getGlobalStore("SessionStore")).andReturn(sessionStoreMock());
+ expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
+ expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock());
+ expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock());
+ expect(stateManager.getGlobalStore(anyString())).andReturn(null);
+
+ expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock());
+ expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock());
+ expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock());
replay(stateManager);
@@ -98,16 +119,20 @@ public class ProcessorContextImplTest {
mock(ThreadCache.class)
);
- context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, emptySet()));
+ context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
+ new HashSet<>(asList("LocalKeyValueStore", "LocalWindowStore", "LocalSessionStore"))));
}
@Test
- public void testKeyValueStore() {
- doTest("KeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
- checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
- checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent");
- checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll");
- checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete");
+ public void globalKeyValueStoreShouldBeReadOnly() {
+ doTest("GlobalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ checkThrowsUnsupportedOperation(store::flush, "flush()");
+ checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()");
+ checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent()");
+ checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()");
+ checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()");
assertEquals((Long) VAL, store.get(KEY));
assertEquals(rangeIter, store.range("one", "two"));
@@ -117,10 +142,13 @@ public class ProcessorContextImplTest {
}
@Test
- public void testWindowStore() {
- doTest("WindowStore", (Consumer<WindowStore<String, Long>>) store -> {
- checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put");
- checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
+ public void globalWindowStoreShouldBeReadOnly() {
+ doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ checkThrowsUnsupportedOperation(store::flush, "flush()");
+ checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put()");
+ checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()");
assertEquals(iters.get(0), store.fetchAll(0L, 0L));
assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@@ -131,10 +159,13 @@ public class ProcessorContextImplTest {
}
@Test
- public void testSessionStore() {
- doTest("SessionStore", (Consumer<SessionStore<String, Long>>) store -> {
- checkThrowsUnsupportedOperation(() -> store.remove(null), "remove");
- checkThrowsUnsupportedOperation(() -> store.put(null, null), "put");
+ public void globalSessionStoreShouldBeReadOnly() {
+ doTest("GlobalSessionStore", (Consumer<SessionStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ checkThrowsUnsupportedOperation(store::flush, "flush()");
+ checkThrowsUnsupportedOperation(() -> store.remove(null), "remove()");
+ checkThrowsUnsupportedOperation(() -> store.put(null, null), "put()");
assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L));
assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L));
@@ -143,6 +174,77 @@ public class ProcessorContextImplTest {
});
}
+ @Test
+ public void localKeyValueStoreShouldNotAllowInitOrClose() {
+ doTest("LocalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ store.flush();
+ assertTrue(flushExecuted);
+
+ store.put("1", 1L);
+ assertTrue(putExecuted);
+
+ store.putIfAbsent("1", 1L);
+ assertTrue(putIfAbsentExecuted);
+
+ store.putAll(Collections.emptyList());
+ assertTrue(putAllExecuted);
+
+ store.delete("1");
+ assertTrue(deleteExecuted);
+
+ assertEquals((Long) VAL, store.get(KEY));
+ assertEquals(rangeIter, store.range("one", "two"));
+ assertEquals(allIter, store.all());
+ assertEquals(VAL, store.approximateNumEntries());
+ });
+ }
+
+ @Test
+ public void localWindowStoreShouldNotAllowInitOrClose() {
+ doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ store.flush();
+ assertTrue(flushExecuted);
+
+ store.put("1", 1L);
+ assertTrue(putExecuted);
+
+ store.put("1", 1L, 1L);
+ assertTrue(put3argExecuted);
+
+ assertEquals(iters.get(0), store.fetchAll(0L, 0L));
+ assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
+ assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
+ assertEquals((Long) VAL, store.fetch(KEY, 1L));
+ assertEquals(iters.get(2), store.all());
+ });
+ }
+
+ @Test
+ public void localSessionStoreShouldNotAllowInitOrClose() {
+ doTest("LocalSessionStore", (Consumer<SessionStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ store.flush();
+ assertTrue(flushExecuted);
+
+ store.remove(null);
+ assertTrue(removeExecuted);
+
+ store.put(null, null);
+ assertTrue(putExecuted);
+
+ assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L));
+ assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L));
+ assertEquals(iters.get(5), store.fetch(KEY));
+ assertEquals(iters.get(6), store.fetch(KEY, KEY));
+ });
+ }
+
+ @SuppressWarnings("unchecked")
private KeyValueStore<String, Long> keyValueStoreMock() {
final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class);
@@ -154,6 +256,31 @@ public class ProcessorContextImplTest {
expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter);
expect(keyValueStoreMock.all()).andReturn(allIter);
+
+ keyValueStoreMock.put(anyString(), anyLong());
+ expectLastCall().andAnswer(() -> {
+ putExecuted = true;
+ return null;
+ });
+
+ keyValueStoreMock.putIfAbsent(anyString(), anyLong());
+ expectLastCall().andAnswer(() -> {
+ putIfAbsentExecuted = true;
+ return null;
+ });
+
+ keyValueStoreMock.putAll(anyObject(List.class));
+ expectLastCall().andAnswer(() -> {
+ putAllExecuted = true;
+ return null;
+ });
+
+ keyValueStoreMock.delete(anyString());
+ expectLastCall().andAnswer(() -> {
+ deleteExecuted = true;
+ return null;
+ });
+
replay(keyValueStoreMock);
return keyValueStoreMock;
@@ -170,11 +297,24 @@ public class ProcessorContextImplTest {
expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL);
expect(windowStore.all()).andReturn(iters.get(2));
+ windowStore.put(anyString(), anyLong());
+ expectLastCall().andAnswer(() -> {
+ putExecuted = true;
+ return null;
+ });
+
+ windowStore.put(anyString(), anyLong(), anyLong());
+ expectLastCall().andAnswer(() -> {
+ put3argExecuted = true;
+ return null;
+ });
+
replay(windowStore);
return windowStore;
}
+ @SuppressWarnings("unchecked")
private SessionStore<String, Long> sessionStoreMock() {
final SessionStore<String, Long> sessionStore = mock(SessionStore.class);
@@ -185,25 +325,31 @@ public class ProcessorContextImplTest {
expect(sessionStore.fetch(anyString())).andReturn(iters.get(5));
expect(sessionStore.fetch(anyString(), anyString())).andReturn(iters.get(6));
+ sessionStore.put(anyObject(Windowed.class), anyLong());
+ expectLastCall().andAnswer(() -> {
+ putExecuted = true;
+ return null;
+ });
+
+ sessionStore.remove(anyObject(Windowed.class));
+ expectLastCall().andAnswer(() -> {
+ removeExecuted = true;
+ return null;
+ });
+
replay(sessionStore);
return sessionStore;
}
- private void initStateStoreMock(final StateStore windowStore) {
- expect(windowStore.name()).andReturn(STORE_NAME);
- expect(windowStore.persistent()).andReturn(true);
- expect(windowStore.isOpen()).andReturn(true);
-
- windowStore.init(null, null);
- expectLastCall().andAnswer(() -> {
- initExecuted = true;
- return null;
- });
+ private void initStateStoreMock(final StateStore stateStore) {
+ expect(stateStore.name()).andReturn(STORE_NAME);
+ expect(stateStore.persistent()).andReturn(true);
+ expect(stateStore.isOpen()).andReturn(true);
- windowStore.close();
+ stateStore.flush();
expectLastCall().andAnswer(() -> {
- closeExecuted = true;
+ flushExecuted = true;
return null;
});
}
@@ -215,8 +361,6 @@ public class ProcessorContextImplTest {
public void init(final ProcessorContext context) {
final T store = (T) context.getStateStore(name);
- checkStateStoreMethods(store);
-
checker.accept(store);
}
@@ -235,18 +379,13 @@ public class ProcessorContextImplTest {
processor.init(context);
}
- private void checkStateStoreMethods(final StateStore store) {
- checkThrowsUnsupportedOperation(store::flush, "flush");
-
+ private void verifyStoreCannotBeInitializedOrClosed(final StateStore store) {
assertEquals(STORE_NAME, store.name());
assertTrue(store.persistent());
assertTrue(store.isOpen());
- store.init(null, null);
- assertTrue(initExecuted);
-
- store.close();
- assertTrue(closeExecuted);
+ checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+ checkThrowsUnsupportedOperation(store::close, "close()");
}
private void checkThrowsUnsupportedOperation(final Runnable check, final String name) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 11050fe..14b94da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -662,11 +662,6 @@ public class ProcessorTopologyTest {
public void process(final String key, final String value) {
store.put(key, value);
}
-
- @Override
- public void close() {
- store.close();
- }
}
private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {