You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/06 22:57:23 UTC
[kafka] branch trunk updated: KAFKA-3522: Add public interfaces for
timestamped stores (#6175)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab00c51 KAFKA-3522: Add public interfaces for timestamped stores (#6175)
ab00c51 is described below
commit ab00c51b6362b41071ae32611ebf698ba9c221ee
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Mar 6 14:57:08 2019 -0800
KAFKA-3522: Add public interfaces for timestamped stores (#6175)
Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../kafka/streams/state/QueryableStoreType.java | 6 +-
.../kafka/streams/state/QueryableStoreTypes.java | 63 ++++++-
.../apache/kafka/streams/state/SessionStore.java | 10 +-
.../streams/state/TimestampedWindowStore.java | 34 ++++
.../kafka/streams/state/ValueAndTimestamp.java | 11 +-
.../apache/kafka/streams/state/WindowStore.java | 4 +-
.../state/internals/KeyValueIteratorFacade.java | 50 +++++
.../internals/ReadOnlyKeyValueStoreFacade.java | 52 ++++++
.../state/internals/ReadOnlyWindowStoreFacade.java | 123 ++++++++++++
.../internals/KeyValueIteratorFacadeTest.java | 87 +++++++++
.../internals/ReadOnlyKeyValueStoreFacadeTest.java | 99 ++++++++++
.../internals/ReadOnlyWindowStoreFacadeTest.java | 207 +++++++++++++++++++++
.../apache/kafka/streams/TopologyTestDriver.java | 102 +++++++++-
.../streams/internals/KeyValueStoreFacade.java | 92 +++++++++
.../kafka/streams/internals/WindowStoreFacade.java | 76 ++++++++
.../streams/internals/KeyValueStoreFacadeTest.java | 168 +++++++++++++++++
.../streams/internals/WindowStoreFacadeTest.java | 135 ++++++++++++++
17 files changed, 1300 insertions(+), 19 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
index 3d4f55c..79c335a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
@@ -43,7 +43,9 @@ public interface QueryableStoreType<T> {
*
* @param storeProvider provides access to all the underlying StateStore instances
* @param storeName The name of the Store
- * @return a read-only interface over a {@code StateStore} (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
+ * @return a read-only interface over a {@code StateStore}
+ * (cf. {@link org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
*/
- T create(final StateStoreProvider storeProvider, final String storeName);
+ T create(final StateStoreProvider storeProvider,
+ final String storeName);
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index 297e181..7b1e8b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -43,6 +43,17 @@ public final class QueryableStoreTypes {
}
/**
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link QueryableStoreTypes.TimestampedKeyValueStoreType}
+ */
+ public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> timestampedKeyValueStore() {
+ return new TimestampedKeyValueStoreType<>();
+ }
+
+ /**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
*
* @param <K> key type of the store
@@ -54,6 +65,17 @@ public final class QueryableStoreTypes {
}
/**
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore<K, ValueAndTimestamp<V>>}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link QueryableStoreTypes.TimestampedWindowStoreType}
+ */
+ public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> timestampedWindowStore() {
+ return new TimestampedWindowStoreType<>();
+ }
+
+ /**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
*
* @param <K> key type of the store
@@ -79,7 +101,8 @@ public final class QueryableStoreTypes {
}
}
- static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
+ public static class KeyValueStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, V>> {
+
KeyValueStoreType() {
super(ReadOnlyKeyValueStore.class);
}
@@ -92,7 +115,22 @@ public final class QueryableStoreTypes {
}
- static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
+ private static class TimestampedKeyValueStoreType<K, V>
+ extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> {
+
+ TimestampedKeyValueStoreType() {
+ super(ReadOnlyKeyValueStore.class);
+ }
+
+ @Override
+ public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
+ final String storeName) {
+ return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, storeName);
+ }
+ }
+
+ public static class WindowStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, V>> {
+
WindowStoreType() {
super(ReadOnlyWindowStore.class);
}
@@ -104,14 +142,31 @@ public final class QueryableStoreTypes {
}
}
- static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
+ private static class TimestampedWindowStoreType<K, V>
+ extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> {
+
+ TimestampedWindowStoreType() {
+ super(ReadOnlyWindowStore.class);
+ }
+
+ @Override
+ public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final StateStoreProvider storeProvider,
+ final String storeName) {
+ return new CompositeReadOnlyWindowStore<>(storeProvider, this, storeName);
+ }
+ }
+
+ public static class SessionStoreType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySessionStore<K, V>> {
+
SessionStoreType() {
super(ReadOnlySessionStore.class);
}
+
@Override
public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlySessionStore<>(storeProvider, this, storeName);
}
}
-}
+
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index c8b1805..4f897e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -16,11 +16,19 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
/**
- * Interface for storing the aggregated values of sessions
+ * Interface for storing the aggregated values of sessions.
+ * <p>
+ * The key is internally represented as {@link Windowed Windowed<K>} that comprises the plain key
+ * and the {@link Window} that represents window start- and end-timestamp.
+ * <p>
+ * If two sessions are merged, a new session with new start- and end-timestamp must be inserted into the store
+ * while the two old sessions must be deleted.
+ *
* @param <K> type of the record keys
* @param <AGG> type of the aggregated values
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java
new file mode 100644
index 0000000..7d52c12
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedWindowStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.kstream.Windowed;
+
+/**
+ * Interface for storing the aggregated values of fixed-size time windows.
+ * <p>
+ * Note, that the stores's physical key type is {@link Windowed Windowed<K>}.
+ * In contrast to a {@link WindowStore} that stores plain windowedKeys-value pairs,
+ * a {@code TimestampedWindowStore} stores windowedKeys-(value/timestamp) pairs.
+ * <p>
+ * While the window start- and end-timestamp are fixed per window, the value-side timestamp is used
+ * to store the last update timestamp of the corresponding window.
+ *
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ */
+public interface TimestampedWindowStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
index a0acc77..8bb652c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
@@ -36,6 +36,15 @@ public class ValueAndTimestamp<V> {
this.timestamp = timestamp;
}
+ /**
+ * Create a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null}.
+ *
+ * @param value the value
+ * @param timestamp the timestamp
+ * @param <V> the type of the value
+ * @return a new {@link ValueAndTimestamp} instance if the provide {@code value} is not {@code null};
+ * otherwise {@code null} is returned
+ */
public static <V> ValueAndTimestamp<V> make(final V value,
final long timestamp) {
return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
@@ -71,4 +80,4 @@ public class ValueAndTimestamp<V> {
public int hashCode() {
return Objects.hash(value, timestamp);
}
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 2203f59..f7eb37e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -26,7 +26,9 @@ import java.time.Instant;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
/**
- * A windowed store interface extending {@link StateStore}.
+ * Interface for storing the aggregated values of fixed-size time windows.
+ * <p>
+ * Note, that the stores's physical key type is {@link Windowed Windowed<K>}.
*
* @param <K> Type of keys
* @param <V> Type of values
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
new file mode 100644
index 0000000..099c3df
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
+ private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
+
+ public KeyValueIteratorFacade(final KeyValueIterator<K, ValueAndTimestamp<V>> iterator) {
+ innerIterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public K peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public KeyValue<K, V> next() {
+ final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue = innerIterator.next();
+ return KeyValue.pair(innerKeyValue.key, innerKeyValue.value.value());
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
new file mode 100644
index 0000000..fe24043
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+public class ReadOnlyKeyValueStoreFacade<K, V> implements ReadOnlyKeyValueStore<K, V> {
+ protected final TimestampedKeyValueStore<K, V> inner;
+
+ protected ReadOnlyKeyValueStoreFacade(final TimestampedKeyValueStore<K, V> store) {
+ inner = store;
+ }
+
+ @Override
+ public V get(final K key) {
+ final ValueAndTimestamp<V> valueAndTimestamp = inner.get(key);
+ return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(final K from,
+ final K to) {
+ return new KeyValueIteratorFacade<>(inner.range(from, to));
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new KeyValueIteratorFacade<>(inner.all());
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return inner.approximateNumEntries();
+ }
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
new file mode 100644
index 0000000..46db1f2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+public class ReadOnlyWindowStoreFacade<K, V> implements ReadOnlyWindowStore<K, V> {
+ protected final TimestampedWindowStore<K, V> inner;
+
+ protected ReadOnlyWindowStoreFacade(final TimestampedWindowStore<K, V> store) {
+ inner = store;
+ }
+
+ @Override
+ public V fetch(final K key,
+ final long time) {
+ final ValueAndTimestamp<V> valueAndTimestamp = inner.fetch(key, time);
+ return valueAndTimestamp == null ? null : valueAndTimestamp.value();
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
+ return new WindowStoreIteratorFacade<>(inner.fetch(key, timeFrom, timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<V> fetch(final K key,
+ final Instant from,
+ final Instant to) throws IllegalArgumentException {
+ return new WindowStoreIteratorFacade<>(inner.fetch(key, from, to));
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final long timeFrom,
+ final long timeTo) {
+ return new KeyValueIteratorFacade<>(inner.fetch(from, to, timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final Instant fromTime,
+ final Instant toTime) throws IllegalArgumentException {
+ return new KeyValueIteratorFacade<>(inner.fetch(from, to, fromTime, toTime));
+ }
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return new KeyValueIteratorFacade<>(inner.fetchAll(timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from,
+ final Instant to) throws IllegalArgumentException {
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> innerIterator = inner.fetchAll(from, to);
+ return new KeyValueIteratorFacade<>(innerIterator);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> innerIterator = inner.all();
+ return new KeyValueIteratorFacade<>(innerIterator);
+ }
+
+ private static class WindowStoreIteratorFacade<V> implements WindowStoreIterator<V> {
+ final KeyValueIterator<Long, ValueAndTimestamp<V>> innerIterator;
+
+ WindowStoreIteratorFacade(final KeyValueIterator<Long, ValueAndTimestamp<V>> iterator) {
+ innerIterator = iterator;
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+
+ @Override
+ public Long peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<Long, V> next() {
+ final KeyValue<Long, ValueAndTimestamp<V>> innerKeyValue = innerIterator.next();
+ return KeyValue.pair(innerKeyValue.key, innerKeyValue.value.value());
+ }
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
new file mode 100644
index 0000000..19566e9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(EasyMockRunner.class)
+public class KeyValueIteratorFacadeTest {
+ @Mock
+ private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueIterator;
+
+ private KeyValueIteratorFacade<String, String> keyValueIteratorFacade;
+
+ @Before
+ public void setup() {
+ keyValueIteratorFacade = new KeyValueIteratorFacade<>(mockedKeyValueIterator);
+ }
+
+ @Test
+ public void shouldForwardHasNext() {
+ expect(mockedKeyValueIterator.hasNext()).andReturn(true).andReturn(false);
+ replay(mockedKeyValueIterator);
+
+ assertTrue(keyValueIteratorFacade.hasNext());
+ assertFalse(keyValueIteratorFacade.hasNext());
+ verify(mockedKeyValueIterator);
+ }
+
+ @Test
+ public void shouldForwardPeekNextKey() {
+ expect(mockedKeyValueIterator.peekNextKey()).andReturn("key");
+ replay(mockedKeyValueIterator);
+
+ assertThat(keyValueIteratorFacade.peekNextKey(), is("key"));
+ verify(mockedKeyValueIterator);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairOnGet() {
+ expect(mockedKeyValueIterator.next()).andReturn(
+ new KeyValue<>("key", ValueAndTimestamp.make("value", 42L)));
+ replay(mockedKeyValueIterator);
+
+ assertThat(keyValueIteratorFacade.next(), is(KeyValue.pair("key", "value")));
+ verify(mockedKeyValueIterator);
+ }
+
+ @Test
+ public void shouldCloseInnerIterator() {
+ mockedKeyValueIterator.close();
+ expectLastCall();
+ replay(mockedKeyValueIterator);
+
+ keyValueIteratorFacade.close();
+ verify(mockedKeyValueIterator);
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
new file mode 100644
index 0000000..4b0ca79
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+
+@RunWith(EasyMockRunner.class)
+public class ReadOnlyKeyValueStoreFacadeTest {
+ @Mock
+ private TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore;
+ @Mock
+ private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueTimestampIterator;
+
+ private ReadOnlyKeyValueStoreFacade<String, String> readOnlyKeyValueStoreFacade;
+
+ @Before
+ public void setup() {
+ readOnlyKeyValueStoreFacade = new ReadOnlyKeyValueStoreFacade<>(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainValueOnGet() {
+ expect(mockedKeyValueTimestampStore.get("key"))
+ .andReturn(ValueAndTimestamp.make("value", 42L));
+ expect(mockedKeyValueTimestampStore.get("unknownKey"))
+ .andReturn(null);
+ replay(mockedKeyValueTimestampStore);
+
+ assertThat(readOnlyKeyValueStoreFacade.get("key"), is("value"));
+ assertNull(readOnlyKeyValueStoreFacade.get("unknownKey"));
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsForRangeIterator() {
+ expect(mockedKeyValueTimestampIterator.next())
+ .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L)))
+ .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L)));
+ expect(mockedKeyValueTimestampStore.range("key1", "key2")).andReturn(mockedKeyValueTimestampIterator);
+ replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+
+ final KeyValueIterator<String, String> iterator = readOnlyKeyValueStoreFacade.range("key1", "key2");
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsForAllIterator() {
+ expect(mockedKeyValueTimestampIterator.next())
+ .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 21L)))
+ .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 42L)));
+ expect(mockedKeyValueTimestampStore.all()).andReturn(mockedKeyValueTimestampIterator);
+ replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+
+ final KeyValueIterator<String, String> iterator = readOnlyKeyValueStoreFacade.all();
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldForwardApproximateNumEntries() {
+ expect(mockedKeyValueTimestampStore.approximateNumEntries()).andReturn(42L);
+ replay(mockedKeyValueTimestampStore);
+
+ assertThat(readOnlyKeyValueStoreFacade.approximateNumEntries(), is(42L));
+ verify(mockedKeyValueTimestampStore);
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
new file mode 100644
index 0000000..47ebeaa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.time.Instant;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+
+@RunWith(EasyMockRunner.class)
+public class ReadOnlyWindowStoreFacadeTest {
+ @Mock
+ private TimestampedWindowStore<String, String> mockedWindowTimestampStore;
+ @Mock
+ private WindowStoreIterator<ValueAndTimestamp<String>> mockedWindowTimestampIterator;
+ @Mock
+ private KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> mockedKeyValueWindowTimestampIterator;
+
+ private ReadOnlyWindowStoreFacade<String, String> readOnlyWindowStoreFacade;
+
+ @Before
+ public void setup() {
+ readOnlyWindowStoreFacade = new ReadOnlyWindowStoreFacade<>(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnSingleKeyFetch() {
+ expect(mockedWindowTimestampStore.fetch("key1", 21L))
+ .andReturn(ValueAndTimestamp.make("value1", 42L));
+ expect(mockedWindowTimestampStore.fetch("unknownKey", 21L))
+ .andReturn(null);
+ replay(mockedWindowTimestampStore);
+
+ assertThat(readOnlyWindowStoreFacade.fetch("key1", 21L), is("value1"));
+ assertNull(readOnlyWindowStoreFacade.fetch("unknownKey", 21L));
+
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnSingleKeyFetchLongParameters() {
+ expect(mockedWindowTimestampIterator.next())
+ .andReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 22L)))
+ .andReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 23L)));
+ expect(mockedWindowTimestampStore.fetch("key1", 21L, 42L))
+ .andReturn(mockedWindowTimestampIterator);
+ replay(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+
+ final WindowStoreIterator<String> iterator =
+ readOnlyWindowStoreFacade.fetch("key1", 21L, 42L);
+
+ assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+ verify(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnSingleKeyFetchInstantParameters() {
+ expect(mockedWindowTimestampIterator.next())
+ .andReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1", 22L)))
+ .andReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2", 23L)));
+ expect(mockedWindowTimestampStore.fetch("key1", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+ .andReturn(mockedWindowTimestampIterator);
+ replay(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+
+ final WindowStoreIterator<String> iterator =
+ readOnlyWindowStoreFacade.fetch("key1", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+ verify(mockedWindowTimestampIterator, mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnRangeFetchLongParameters() {
+ expect(mockedKeyValueWindowTimestampIterator.next())
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+ expect(mockedWindowTimestampStore.fetch("key1", "key2", 21L, 42L))
+ .andReturn(mockedKeyValueWindowTimestampIterator);
+ replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlyWindowStoreFacade.fetch("key1", "key2", 21L, 42L);
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+ verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnRangeFetchInstantParameters() {
+ expect(mockedKeyValueWindowTimestampIterator.next())
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+ expect(mockedWindowTimestampStore.fetch("key1", "key2", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+ .andReturn(mockedKeyValueWindowTimestampIterator);
+ replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlyWindowStoreFacade.fetch("key1", "key2", Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+ verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnFetchAllLongParameters() {
+ expect(mockedKeyValueWindowTimestampIterator.next())
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+ expect(mockedWindowTimestampStore.fetchAll(21L, 42L))
+ .andReturn(mockedKeyValueWindowTimestampIterator);
+ replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlyWindowStoreFacade.fetchAll(21L, 42L);
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+ verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnFetchAllInstantParameters() {
+ expect(mockedKeyValueWindowTimestampIterator.next())
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+ expect(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+ .andReturn(mockedKeyValueWindowTimestampIterator);
+ replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ readOnlyWindowStoreFacade.fetchAll(Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+ verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnPlainKeyValuePairsOnAll() {
+ expect(mockedKeyValueWindowTimestampIterator.next())
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .andReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+ expect(mockedWindowTimestampStore.all()).andReturn(mockedKeyValueWindowTimestampIterator);
+ replay(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+
+ final KeyValueIterator<Windowed<String>, String> iterator = readOnlyWindowStoreFacade.all();
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1", new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2", new TimeWindow(42L, 43L)), "value2")));
+ verify(mockedKeyValueWindowTimestampIterator, mockedWindowTimestampStore);
+ }
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e8f4fd6..b00d229 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -40,7 +40,9 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.KeyValueStoreFacade;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
+import org.apache.kafka.streams.internals.WindowStoreFacade;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -62,10 +64,15 @@ import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -175,6 +182,8 @@ import java.util.regex.Pattern;
@InterfaceStability.Evolving
public class TopologyTestDriver implements Closeable {
+ private static final Logger log = LoggerFactory.getLogger(TopologyTestDriver.class);
+
private final Time mockWallClockTime;
private final InternalTopologyBuilder internalTopologyBuilder;
@@ -245,7 +254,7 @@ public class TopologyTestDriver implements Closeable {
processorTopology = internalTopologyBuilder.build(null);
globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
final boolean createStateDirectory = processorTopology.hasPersistentLocalStore() ||
- (globalTopology != null && globalTopology.hasPersistentGlobalStore());
+ (globalTopology != null && globalTopology.hasPersistentGlobalStore());
final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
@@ -465,7 +474,8 @@ public class TopologyTestDriver implements Closeable {
// Forward back into the topology if the produced record is to an internal or a source topic ...
final String outputTopicName = record.topic();
if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)
- || globalPartitionsByTopic.containsKey(outputTopicName)) {
+ || globalPartitionsByTopic.containsKey(outputTopicName)) {
+
final byte[] serializedKey = record.key();
final byte[] serializedValue = record.value();
@@ -564,7 +574,9 @@ public class TopologyTestDriver implements Closeable {
* @return all stores my name
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings("WeakerAccess")
@@ -587,7 +599,9 @@ public class TopologyTestDriver implements Closeable {
* @return the state store, or {@code null} if no store has been registered with the given name
* @see #getAllStateStores()
* @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings("WeakerAccess")
@@ -611,46 +625,112 @@ public class TopologyTestDriver implements Closeable {
}
/**
- * Get the {@link KeyValueStore} with the given name.
+ * Get the {@link KeyValueStore} or {@link TimestampedKeyValueStore} with the given name.
* The store can be a "regular" or global store.
* <p>
+ * If the registered store is a {@link TimestampedKeyValueStore} this method will return a value-only query
+ * interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
+ * {@link #getTimestampedKeyValueStore(String)} for full store access instead.</strong>
+ * <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
- * @return the key value store, or {@code null} if no {@link KeyValueStore} has been registered with the given name
+ * @return the key value store, or {@code null} if no {@link KeyValueStore} or {@link TimestampedKeyValueStore}
+ * has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
+ * @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
*/
@SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
final StateStore store = getStateStore(name);
+ if (store instanceof TimestampedKeyValueStore) {
+ log.info("Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.");
+ return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) store);
+ }
return store instanceof KeyValueStore ? (KeyValueStore<K, V>) store : null;
}
/**
- * Get the {@link WindowStore} with the given name.
+ * Get the {@link TimestampedKeyValueStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* This is often useful in test cases to pre-populate the store before the test case instructs the topology to
* {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
*
* @param name the name of the store
- * @return the key value store, or {@code null} if no {@link WindowStore} has been registered with the given name
+ * @return the key value store, or {@code null} if no {@link TimestampedKeyValueStore} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
- * @see #getSessionStore(String) (String)
+ * @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
+ * @see #getSessionStore(String)
*/
- @SuppressWarnings({"unchecked", "WeakerAccess", "unused"})
+ @SuppressWarnings({"unchecked", "WeakerAccess"})
+ public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> getTimestampedKeyValueStore(final String name) {
+ final StateStore store = getStateStore(name);
+ return store instanceof TimestampedKeyValueStore ? (TimestampedKeyValueStore<K, V>) store : null;
+ }
+
+ /**
+ * Get the {@link WindowStore} or {@link TimestampedWindowStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * If the registered store is a {@link TimestampedWindowStore} this method will return a value-only query
+ * interface. <strong>It is highly recommended to update the code for this case to avoid bugs and to use
+ * {@link #getTimestampedWindowStore(String)} for full store access instead.</strong>
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+ * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link WindowStore} or {@link TimestampedWindowStore}
+ * has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
+ * @see #getTimestampedWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ @SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
final StateStore store = getStateStore(name);
+ if (store instanceof TimestampedWindowStore) {
+ log.info("Method #getTimestampedWindowStore() should be used to access a TimestampedWindowStore.");
+ return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) store);
+ }
return store instanceof WindowStore ? (WindowStore<K, V>) store : null;
}
/**
+ * Get the {@link TimestampedWindowStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
+ * {@link #pipeInput(ConsumerRecord) process an input message}, and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link TimestampedWindowStore} has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
+ * @see #getWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ @SuppressWarnings({"unchecked", "WeakerAccess"})
+ public <K, V> WindowStore<K, ValueAndTimestamp<V>> getTimestampedWindowStore(final String name) {
+ final StateStore store = getStateStore(name);
+ return store instanceof TimestampedWindowStore ? (TimestampedWindowStore<K, V>) store : null;
+ }
+
+ /**
* Get the {@link SessionStore} with the given name.
* The store can be a "regular" or global store.
* <p>
@@ -662,9 +742,11 @@ public class TopologyTestDriver implements Closeable {
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
* @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
*/
- @SuppressWarnings({"unchecked", "WeakerAccess", "unused"})
+ @SuppressWarnings({"unchecked", "WeakerAccess"})
public <K, V> SessionStore<K, V> getSessionStore(final String name) {
final StateStore store = getStateStore(name);
return store instanceof SessionStore ? (SessionStore<K, V>) store : null;
@@ -753,4 +835,4 @@ public class TopologyTestDriver implements Closeable {
}
return consumer;
}
-}
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
new file mode 100644
index 0000000..7f1da6e
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/KeyValueStoreFacade.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
+
+import java.util.List;
+
+public class KeyValueStoreFacade<K, V> extends ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
+
+ public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner) {
+ super(inner);
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ inner.init(context, root);
+ }
+
+ @Override
+ public void put(final K key,
+ final V value) {
+ inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
+ }
+
+ @Override
+ public V putIfAbsent(final K key,
+ final V value) {
+ final ValueAndTimestamp<V> old = inner.putIfAbsent(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
+ return old == null ? null : old.value();
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, V>> entries) {
+ for (final KeyValue<K, V> entry : entries) {
+ inner.put(entry.key, ValueAndTimestamp.make(entry.value, ConsumerRecord.NO_TIMESTAMP));
+ }
+ }
+
+ @Override
+ public V delete(final K key) {
+ final ValueAndTimestamp<V> old = inner.delete(key);
+ return old == null ? null : old.value();
+ }
+
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
new file mode 100644
index 0000000..f6f8f33
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
+
+public class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K, V> implements WindowStore<K, V> {
+
+ public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
+ super(store);
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ inner.init(context, root);
+ }
+
+ @Override
+ public void put(final K key,
+ final V value) {
+ inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
+ }
+
+ @Override
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
+ inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
+ }
+
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+}
\ No newline at end of file
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
new file mode 100644
index 0000000..b7814c7
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static java.util.Arrays.asList;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+@RunWith(EasyMockRunner.class)
+public class KeyValueStoreFacadeTest {
+ @Mock
+ private TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore;
+ @Mock
+ private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueTimestampIterator;
+
+ private KeyValueStoreFacade<String, String> keyValueStoreFacade;
+
+ @Before
+ public void setup() {
+ keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldForwardInit() {
+ final ProcessorContext context = mock(ProcessorContext.class);
+ final StateStore store = mock(StateStore.class);
+ mockedKeyValueTimestampStore.init(context, store);
+ expectLastCall();
+ replay(mockedKeyValueTimestampStore);
+
+ keyValueStoreFacade.init(context, store);
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldPutWithUnknownTimestamp() {
+ mockedKeyValueTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
+ expectLastCall();
+ replay(mockedKeyValueTimestampStore);
+
+ keyValueStoreFacade.put("key", "value");
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldPutIfAbsentWithUnknownTimestamp() {
+ expect(mockedKeyValueTimestampStore.putIfAbsent("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP)))
+ .andReturn(null)
+ .andReturn(ValueAndTimestamp.make("oldValue", 42L));
+ replay(mockedKeyValueTimestampStore);
+
+ assertNull(keyValueStoreFacade.putIfAbsent("key", "value"));
+ assertThat(keyValueStoreFacade.putIfAbsent("key", "value"), is("oldValue"));
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldPutAllWithUnknownTimestamp() {
+ mockedKeyValueTimestampStore.put("key1", ValueAndTimestamp.make("value1", ConsumerRecord.NO_TIMESTAMP));
+ mockedKeyValueTimestampStore.put("key2", ValueAndTimestamp.make("value2", ConsumerRecord.NO_TIMESTAMP));
+ expectLastCall();
+ replay(mockedKeyValueTimestampStore);
+
+ keyValueStoreFacade.putAll(asList(
+ KeyValue.pair("key1", "value1"),
+ KeyValue.pair("key2", "value2")
+ ));
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldDeleteAndReturnPlainValue() {
+ expect(mockedKeyValueTimestampStore.delete("key"))
+ .andReturn(null)
+ .andReturn(ValueAndTimestamp.make("oldValue", 42L));
+ replay(mockedKeyValueTimestampStore);
+
+ assertNull(keyValueStoreFacade.delete("key"));
+ assertThat(keyValueStoreFacade.delete("key"), is("oldValue"));
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldForwardFlush() {
+ mockedKeyValueTimestampStore.flush();
+ expectLastCall();
+ replay(mockedKeyValueTimestampStore);
+
+ keyValueStoreFacade.flush();
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldForwardClose() {
+ mockedKeyValueTimestampStore.close();
+ expectLastCall();
+ replay(mockedKeyValueTimestampStore);
+
+ keyValueStoreFacade.close();
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnName() {
+ expect(mockedKeyValueTimestampStore.name()).andReturn("name");
+ replay(mockedKeyValueTimestampStore);
+
+ assertThat(keyValueStoreFacade.name(), is("name"));
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnIsPersistent() {
+ expect(mockedKeyValueTimestampStore.persistent())
+ .andReturn(true)
+ .andReturn(false);
+ replay(mockedKeyValueTimestampStore);
+
+ assertThat(keyValueStoreFacade.persistent(), is(true));
+ assertThat(keyValueStoreFacade.persistent(), is(false));
+ verify(mockedKeyValueTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnIsOpen() {
+ expect(mockedKeyValueTimestampStore.isOpen())
+ .andReturn(true)
+ .andReturn(false);
+ replay(mockedKeyValueTimestampStore);
+
+ assertThat(keyValueStoreFacade.isOpen(), is(true));
+ assertThat(keyValueStoreFacade.isOpen(), is(false));
+ verify(mockedKeyValueTimestampStore);
+ }
+}
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
new file mode 100644
index 0000000..972b308
--- /dev/null
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/internals/WindowStoreFacadeTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(EasyMockRunner.class)
+public class WindowStoreFacadeTest {
+ @Mock
+ private TimestampedWindowStore<String, String> mockedWindowTimestampStore;
+
+ private WindowStoreFacade<String, String> windowStoreFacade;
+
+ @Before
+ public void setup() {
+ windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldForwardInit() {
+ final ProcessorContext context = mock(ProcessorContext.class);
+ final StateStore store = mock(StateStore.class);
+ mockedWindowTimestampStore.init(context, store);
+ expectLastCall();
+ replay(mockedWindowTimestampStore);
+
+ windowStoreFacade.init(context, store);
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldPutWithUnknownTimestamp() {
+ mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
+ expectLastCall();
+ replay(mockedWindowTimestampStore);
+
+ windowStoreFacade.put("key", "value");
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
+ mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP), 21L);
+ expectLastCall();
+ replay(mockedWindowTimestampStore);
+
+ windowStoreFacade.put("key", "value", 21L);
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldForwardFlush() {
+ mockedWindowTimestampStore.flush();
+ expectLastCall();
+ replay(mockedWindowTimestampStore);
+
+ windowStoreFacade.flush();
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldForwardClose() {
+ mockedWindowTimestampStore.close();
+ expectLastCall();
+ replay(mockedWindowTimestampStore);
+
+ windowStoreFacade.close();
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnName() {
+ expect(mockedWindowTimestampStore.name()).andReturn("name");
+ replay(mockedWindowTimestampStore);
+
+ assertThat(windowStoreFacade.name(), is("name"));
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnIsPersistent() {
+ expect(mockedWindowTimestampStore.persistent())
+ .andReturn(true)
+ .andReturn(false);
+ replay(mockedWindowTimestampStore);
+
+ assertThat(windowStoreFacade.persistent(), is(true));
+ assertThat(windowStoreFacade.persistent(), is(false));
+ verify(mockedWindowTimestampStore);
+ }
+
+ @Test
+ public void shouldReturnIsOpen() {
+ expect(mockedWindowTimestampStore.isOpen())
+ .andReturn(true)
+ .andReturn(false);
+ replay(mockedWindowTimestampStore);
+
+ assertThat(windowStoreFacade.isOpen(), is(true));
+ assertThat(windowStoreFacade.isOpen(), is(false));
+ verify(mockedWindowTimestampStore);
+ }
+
+}