You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/07 17:49:49 UTC
[kafka] branch trunk updated: KAFKA-3522: Interactive Queries must
return timestamped stores (#6661)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6d5efa KAFKA-3522: Interactive Queries must return timestamped stores (#6661)
a6d5efa is described below
commit a6d5efaf0d06f8a66350a8f1b959baf176fd482a
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue May 7 19:49:31 2019 +0200
KAFKA-3522: Interactive Queries must return timestamped stores (#6661)
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../kafka/streams/state/QueryableStoreTypes.java | 30 +++--
.../state/internals/GlobalStateStoreProvider.java | 8 ++
.../internals/StreamThreadStateStoreProvider.java | 12 +-
.../internals/GlobalStateStoreProviderTest.java | 135 ++++++++++++++++++++-
.../StreamThreadStateStoreProviderTest.java | 122 +++++++++++++++++--
.../org/apache/kafka/test/NoOpReadOnlyStore.java | 5 +-
6 files changed, 285 insertions(+), 27 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index 7b1e8b3..d4e9e89 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -24,6 +24,11 @@ import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* Provides access to the {@link QueryableStoreType}s provided with {@link KafkaStreams}.
* These can be used with {@link KafkaStreams#store(String, QueryableStoreType)}.
@@ -88,23 +93,28 @@ public final class QueryableStoreTypes {
private static abstract class QueryableStoreTypeMatcher<T> implements QueryableStoreType<T> {
- private final Class matchTo;
+ private final Set<Class> matchTo;
- QueryableStoreTypeMatcher(final Class matchTo) {
+ QueryableStoreTypeMatcher(final Set<Class> matchTo) {
this.matchTo = matchTo;
}
@SuppressWarnings("unchecked")
@Override
public boolean accepts(final StateStore stateStore) {
- return matchTo.isAssignableFrom(stateStore.getClass());
+ for (final Class matchToClass : matchTo) {
+ if (!matchToClass.isAssignableFrom(stateStore.getClass())) {
+ return false;
+ }
+ }
+ return true;
}
}
public static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
KeyValueStoreType() {
- super(ReadOnlyKeyValueStore.class);
+ super(Collections.singleton(ReadOnlyKeyValueStore.class));
}
@Override
@@ -119,7 +129,9 @@ public final class QueryableStoreTypes {
extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> {
TimestampedKeyValueStoreType() {
- super(ReadOnlyKeyValueStore.class);
+ super(new HashSet<>(Arrays.asList(
+ TimestampedKeyValueStore.class,
+ ReadOnlyKeyValueStore.class)));
}
@Override
@@ -132,7 +144,7 @@ public final class QueryableStoreTypes {
public static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
WindowStoreType() {
- super(ReadOnlyWindowStore.class);
+ super(Collections.singleton(ReadOnlyWindowStore.class));
}
@Override
@@ -146,7 +158,9 @@ public final class QueryableStoreTypes {
extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> {
TimestampedWindowStoreType() {
- super(ReadOnlyWindowStore.class);
+ super(new HashSet<>(Arrays.asList(
+ TimestampedWindowStore.class,
+ ReadOnlyWindowStore.class)));
}
@Override
@@ -159,7 +173,7 @@ public final class QueryableStoreTypes {
public static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
SessionStoreType() {
- super(ReadOnlySessionStore.class);
+ super(Collections.singleton(ReadOnlySessionStore.class));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 0db69d0..057a836 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import java.util.Collections;
import java.util.List;
@@ -41,6 +44,11 @@ public class GlobalStateStoreProvider implements StateStoreProvider {
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", is not open.");
}
+ if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
+ return (List<T>) Collections.singletonList(new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore<Object, Object>) store));
+ } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
+ return (List<T>) Collections.singletonList(new ReadOnlyWindowStoreFacade((TimestampedWindowStore<Object, Object>) store));
+ }
return (List<T>) Collections.singletonList(store);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index e097963..53c5cc0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -21,6 +21,9 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import java.util.ArrayList;
import java.util.Collections;
@@ -37,7 +40,6 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider {
this.streamThread = streamThread;
}
-
@SuppressWarnings("unchecked")
@Override
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
@@ -56,7 +58,13 @@ public class StreamThreadStateStoreProvider implements StateStoreProvider {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
" because the store is not open. The state store may have migrated to another instances.");
}
- stores.add((T) store);
+ if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
+ stores.add((T) new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore<Object, Object>) store));
+ } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
+ stores.add((T) new ReadOnlyWindowStoreFacade((TimestampedWindowStore<Object, Object>) store));
+ } else {
+ stores.add((T) store);
+ }
}
}
return stores;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 8b77b37..9c76e14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -16,35 +16,103 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.NoOpReadOnlyStore;
+import org.junit.Before;
import org.junit.Test;
+import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class GlobalStateStoreProviderTest {
+ private final Map<String, StateStore> stores = new HashMap<>();
+
+ @Before
+ public void before() {
+ stores.put(
+ "kv-store",
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("kv-store"),
+ Serdes.String(),
+ Serdes.String()).build());
+ stores.put(
+ "ts-kv-store",
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("ts-kv-store"),
+ Serdes.String(),
+ Serdes.String()).build());
+ stores.put(
+ "w-store",
+ Stores.windowStoreBuilder(
+ Stores.inMemoryWindowStore(
+ "w-store",
+ Duration.ofMillis(10L),
+ Duration.ofMillis(2L),
+ false),
+ Serdes.String(),
+ Serdes.String()).build());
+ stores.put(
+ "ts-w-store",
+ Stores.timestampedWindowStoreBuilder(
+ Stores.inMemoryWindowStore(
+ "ts-w-store",
+ Duration.ofMillis(10L),
+ Duration.ofMillis(2L),
+ false),
+ Serdes.String(),
+ Serdes.String()).build());
+
+ final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class);
+ expect(mockContext.applicationId()).andReturn("appId").anyTimes();
+ expect(mockContext.metrics()).andReturn(new StreamsMetricsImpl(new Metrics(), "threadName")).anyTimes();
+ expect(mockContext.taskId()).andReturn(new TaskId(0, 0)).anyTimes();
+ expect(mockContext.recordCollector()).andReturn(null).anyTimes();
+ replay(mockContext);
+ for (final StateStore store : stores.values()) {
+ store.init(mockContext, null);
+ }
+ }
@Test
public void shouldReturnSingleItemListIfStoreExists() {
final GlobalStateStoreProvider provider =
- new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global", new NoOpReadOnlyStore<>()));
- final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
+ new GlobalStateStoreProvider(Collections.singletonMap("global", new NoOpReadOnlyStore<>()));
+ final List<ReadOnlyKeyValueStore<Object, Object>> stores =
+ provider.stores("global", QueryableStoreTypes.keyValueStore());
assertEquals(stores.size(), 1);
}
@Test
public void shouldReturnEmptyItemListIfStoreDoesntExist() {
- final GlobalStateStoreProvider provider =
- new GlobalStateStoreProvider(Collections.<String, StateStore>emptyMap());
- final List<ReadOnlyKeyValueStore<Object, Object>> stores = provider.stores("global", QueryableStoreTypes.keyValueStore());
+ final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(Collections.emptyMap());
+ final List<ReadOnlyKeyValueStore<Object, Object>> stores =
+ provider.stores("global", QueryableStoreTypes.keyValueStore());
assertTrue(stores.isEmpty());
}
@@ -53,8 +121,63 @@ public class GlobalStateStoreProviderTest {
final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<>();
store.close();
final GlobalStateStoreProvider provider =
- new GlobalStateStoreProvider(Collections.<String, StateStore>singletonMap("global", store));
+ new GlobalStateStoreProvider(Collections.singletonMap("global", store));
provider.stores("global", QueryableStoreTypes.keyValueStore());
}
+ @Test
+ public void shouldReturnKeyValueStore() {
+ final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String, String>> stores =
+ provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyKeyValueStore<String, String> store : stores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ }
+ }
+
+ @Test
+ public void shouldReturnTimestampedKeyValueStore() {
+ final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> stores =
+ provider.stores("ts-kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store : stores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+ }
+ }
+
+ @Test
+ public void shouldNotReturnKeyValueStoreAsTimestampedStore() {
+ final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> stores =
+ provider.stores("kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+ assertEquals(0, stores.size());
+ }
+
+ @Test
+ public void shouldReturnTimestampedKeyValueStoreAsKeyValueStore() {
+ final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> stores =
+ provider.stores("ts-kv-store", QueryableStoreTypes.keyValueStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store : stores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ }
+ }
+
+ @Test
+ public void shouldReturnTimestampedWindowStoreAsWindowStore() {
+ final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores);
+ final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> stores =
+ provider.stores("ts-w-store", QueryableStoreTypes.windowStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store : stores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ }
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a10b62d..da2d46d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -38,6 +38,9 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
@@ -57,7 +60,11 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.timestampedWindowStore;
import static org.apache.kafka.streams.state.QueryableStoreTypes.windowStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
public class StreamThreadStateStoreProviderTest {
@@ -82,16 +89,31 @@ public class StreamThreadStateStoreProviderTest {
Serdes.String()),
"the-processor");
topology.addStateStore(
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("timestamped-kv-store"),
+ Serdes.String(),
+ Serdes.String()),
+ "the-processor");
+ topology.addStateStore(
Stores.windowStoreBuilder(
- Stores.persistentWindowStore(
+ Stores.inMemoryWindowStore(
"window-store",
Duration.ofMillis(10L),
Duration.ofMillis(2L),
false),
Serdes.String(),
Serdes.String()),
- "the-processor"
- );
+ "the-processor");
+ topology.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.inMemoryWindowStore(
+ "timestamped-window-store",
+ Duration.ofMillis(10L),
+ Duration.ofMillis(2L),
+ false),
+ Serdes.String(),
+ Serdes.String()),
+ "the-processor");
final Properties properties = new Properties();
final String applicationId = "applicationId";
@@ -142,14 +164,100 @@ public class StreamThreadStateStoreProviderTest {
final List<ReadOnlyKeyValueStore<String, String>> kvStores =
provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
assertEquals(2, kvStores.size());
+ for (final ReadOnlyKeyValueStore<String, String> store: kvStores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ }
+ }
+
+ @Test
+ public void shouldFindTimestampedKeyValueStores() {
+ mockThread(true);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
+ provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+ assertEquals(2, tkvStores.size());
+ for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store: tkvStores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+ }
+ }
+
+ @Test
+ public void shouldNotFindKeyValueStoresAsTimestampedStore() {
+ mockThread(true);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
+ provider.stores("kv-store", QueryableStoreTypes.timestampedKeyValueStore());
+ assertEquals(0, tkvStores.size());
+ }
+
+ @Test
+ public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() {
+ mockThread(true);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> tkvStores =
+ provider.stores("timestamped-kv-store", QueryableStoreTypes.keyValueStore());
+ assertEquals(2, tkvStores.size());
+ for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> store: tkvStores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ }
}
@Test
public void shouldFindWindowStores() {
mockThread(true);
- final List<ReadOnlyWindowStore<Object, Object>> windowStores =
+ final List<ReadOnlyWindowStore<String, String>> windowStores =
provider.stores("window-store", windowStore());
assertEquals(2, windowStores.size());
+ for (final ReadOnlyWindowStore<String, String> store: windowStores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ }
+ }
+
+ @Test
+ public void shouldFindTimestampedWindowStores() {
+ mockThread(true);
+ final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
+ provider.stores("timestamped-window-store", timestampedWindowStore());
+ assertEquals(2, windowStores.size());
+ for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store: windowStores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store, instanceOf(TimestampedWindowStore.class));
+ }
+ }
+
+ @Test
+ public void shouldNotFindWindowStoresAsTimestampedStore() {
+ mockThread(true);
+ final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
+ provider.stores("window-store", timestampedWindowStore());
+ assertEquals(0, windowStores.size());
+ }
+
+ @Test
+ public void shouldFindTimestampedWindowStoresAsWindowStore() {
+ mockThread(true);
+ final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> windowStores =
+ provider.stores("timestamped-window-store", windowStore());
+ assertEquals(2, windowStores.size());
+ for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store: windowStores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ }
+ }
+
+ @Test(expected = InvalidStateStoreException.class)
+ public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
+ mockThread(true);
+ taskOne.getStore("kv-store").close();
+ provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
+ }
+
+ @Test(expected = InvalidStateStoreException.class)
+ public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() {
+ mockThread(true);
+ taskOne.getStore("timestamped-kv-store").close();
+ provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore());
}
@Test(expected = InvalidStateStoreException.class)
@@ -160,10 +268,10 @@ public class StreamThreadStateStoreProviderTest {
}
@Test(expected = InvalidStateStoreException.class)
- public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
+ public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() {
mockThread(true);
- taskOne.getStore("kv-store").close();
- provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
+ taskOne.getStore("timestamped-window-store").close();
+ provider.stores("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore());
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index 08945d5..dbdd0b4 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -23,16 +23,13 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.io.File;
-public class NoOpReadOnlyStore<K, V>
- implements ReadOnlyKeyValueStore<K, V>, StateStore {
-
+public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, StateStore {
private final String name;
private final boolean rocksdbStore;
private boolean open = true;
public boolean initialized;
public boolean flushed;
-
public NoOpReadOnlyStore() {
this("", false);
}