You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/12 16:28:31 UTC
[kafka] branch trunk updated: KAFKA-3522: add missing guards for
TimestampedXxxStore (#6356)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab7ea07 KAFKA-3522: add missing guards for TimestampedXxxStore (#6356)
ab7ea07 is described below
commit ab7ea07f5e57ec405dc7fddce95de7c639a2fd6e
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Tue Mar 12 09:28:14 2019 -0700
KAFKA-3522: add missing guards for TimestampedXxxStore (#6356)
Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../processor/internals/ProcessorContextImpl.java | 27 +++++-
.../internals/GlobalProcessorContextImplTest.java | 106 ++++++++++++++++++---
.../internals/ProcessorContextImplTest.java | 95 ++++++++++++++++++
3 files changed, 215 insertions(+), 13 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c10ea09..5f32a3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
@@ -84,7 +85,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
final StateStore global = stateManager.getGlobalStore(name);
if (global != null) {
- if (global instanceof KeyValueStore) {
+ if (global instanceof TimestampedKeyValueStore) {
+ return new TimestampedKeyValueStoreReadOnlyDecorator((TimestampedKeyValueStore) global);
+ } else if (global instanceof KeyValueStore) {
return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global);
} else if (global instanceof TimestampedWindowStore) {
return new TimestampedWindowStoreReadOnlyDecorator((TimestampedWindowStore) global);
@@ -108,7 +111,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
final StateStore store = stateManager.getStore(name);
- if (store instanceof KeyValueStore) {
+ if (store instanceof TimestampedKeyValueStore) {
+ return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store);
+ } else if (store instanceof KeyValueStore) {
return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
} else if (store instanceof TimestampedWindowStore) {
return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store);
@@ -294,6 +299,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
+ private static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
+ extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
+ implements TimestampedKeyValueStore<K, V> {
+
+ private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) {
+ super(inner);
+ }
+ }
+
private static class WindowStoreReadOnlyDecorator<K, V>
extends StateStoreReadOnlyDecorator<WindowStore<K, V>, K, V>
implements WindowStore<K, V> {
@@ -484,6 +498,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
+ private static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
+ extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
+ implements TimestampedKeyValueStore<K, V> {
+
+ private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
+ super(inner);
+ }
+ }
+
static class WindowStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V>
implements WindowStore<K, V> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 4153cca..b36557c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -21,6 +21,10 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowStore;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Before;
import org.junit.Test;
@@ -39,6 +43,11 @@ import static org.junit.Assert.fail;
public class GlobalProcessorContextImplTest {
private static final String GLOBAL_STORE_NAME = "global-store";
+ private static final String GLOBAL_KEY_VALUE_STORE_NAME = "global-key-value-store";
+ private static final String GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME = "global-timestamped-key-value-store";
+ private static final String GLOBAL_WINDOW_STORE_NAME = "global-window-store";
+ private static final String GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME = "global-timestamped-window-store";
+ private static final String GLOBAL_SESSION_STORE_NAME = "global-session-store";
private static final String UNKNOWN_STORE = "unknown-store";
private static final String CHILD_PROCESSOR = "child";
@@ -56,7 +65,12 @@ public class GlobalProcessorContextImplTest {
replay(streamsConfig);
final StateManager stateManager = mock(StateManager.class);
- expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(KeyValueStore.class));
+ expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(StateStore.class));
+ expect(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).andReturn(mock(KeyValueStore.class));
+ expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).andReturn(mock(TimestampedKeyValueStore.class));
+ expect(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).andReturn(mock(WindowStore.class));
+ expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class));
+ expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class));
expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
replay(stateManager);
@@ -86,7 +100,7 @@ public class GlobalProcessorContextImplTest {
@Test
public void shouldReturnGlobalOrNullStore() {
- assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new IsInstanceOf(KeyValueStore.class));
+ assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new IsInstanceOf(StateStore.class));
assertNull(globalContext.getStateStore(UNKNOWN_STORE));
}
@@ -135,22 +149,92 @@ public class GlobalProcessorContextImplTest {
}
@Test
- public void shouldNotAllowInit() {
- final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+ public void shouldNotAllowInitForKeyValueStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
store.init(null, null);
fail("Should have thrown UnsupportedOperationException.");
- } catch (final UnsupportedOperationException expected) {
- }
+ } catch (final UnsupportedOperationException expected) { }
}
@Test
- public void shouldNotAllowClose() {
- final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+ public void shouldNotAllowInitForTimestampedKeyValueStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
+ try {
+ store.init(null, null);
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowInitForWindowStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
+ try {
+ store.init(null, null);
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowInitForTimestampedWindowStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
+ try {
+ store.init(null, null);
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowInitForSessionStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
+ try {
+ store.init(null, null);
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowCloseForKeyValueStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
+ try {
+ store.close();
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowCloseForTimestampedKeyValueStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
+ try {
+ store.close();
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowCloseForWindowStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
+ try {
+ store.close();
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowCloseForTimestampedWindowStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
+ try {
+ store.close();
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowCloseForSessionStore() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
try {
store.close();
fail("Should have thrown UnsupportedOperationException.");
- } catch (final UnsupportedOperationException expected) {
- }
+ } catch (final UnsupportedOperationException expected) { }
}
-}
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 9b36ec7..fe4d948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
@@ -70,7 +71,9 @@ public class ProcessorContextImplTest {
private boolean removeExecuted;
private KeyValueIterator<String, Long> rangeIter;
+ private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedRangeIter;
private KeyValueIterator<String, Long> allIter;
+ private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedAllIter;
private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7);
private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<>(7);
@@ -86,7 +89,9 @@ public class ProcessorContextImplTest {
removeExecuted = false;
rangeIter = mock(KeyValueIterator.class);
+ timestampedRangeIter = mock(KeyValueIterator.class);
allIter = mock(KeyValueIterator.class);
+ timestampedAllIter = mock(KeyValueIterator.class);
windowStoreIter = mock(WindowStoreIterator.class);
for (int i = 0; i < 7; i++) {
@@ -103,12 +108,14 @@ public class ProcessorContextImplTest {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
+ expect(stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock());
expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock());
expect(stateManager.getGlobalStore("GlobalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock());
expect(stateManager.getGlobalStore(anyString())).andReturn(null);
expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock());
+ expect(stateManager.getStore("LocalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock());
expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock());
expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock());
@@ -128,6 +135,7 @@ public class ProcessorContextImplTest {
context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
new HashSet<>(asList(
"LocalKeyValueStore",
+ "LocalTimestampedKeyValueStore",
"LocalWindowStore",
"LocalTimestampedWindowStore",
"LocalSessionStore"))));
@@ -152,6 +160,24 @@ public class ProcessorContextImplTest {
}
@Test
+ public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
+ doTest("GlobalTimestampedKeyValueStore", (Consumer<TimestampedKeyValueStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ checkThrowsUnsupportedOperation(store::flush, "flush()");
+ checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 2L)), "put()");
+ checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L)), "putIfAbsent()");
+ checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()");
+ checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()");
+
+ assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY));
+ assertEquals(timestampedRangeIter, store.range("one", "two"));
+ assertEquals(timestampedAllIter, store.all());
+ assertEquals(VALUE, store.approximateNumEntries());
+ });
+ }
+
+ @Test
public void globalWindowStoreShouldBeReadOnly() {
doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
verifyStoreCannotBeInitializedOrClosed(store);
@@ -229,6 +255,33 @@ public class ProcessorContextImplTest {
}
@Test
+ public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
+ doTest("LocalTimestampedKeyValueStore", (Consumer<TimestampedKeyValueStore<String, Long>>) store -> {
+ verifyStoreCannotBeInitializedOrClosed(store);
+
+ store.flush();
+ assertTrue(flushExecuted);
+
+ store.put("1", ValueAndTimestamp.make(1L, 2L));
+ assertTrue(putExecuted);
+
+ store.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L));
+ assertTrue(putIfAbsentExecuted);
+
+ store.putAll(Collections.emptyList());
+ assertTrue(putAllExecuted);
+
+ store.delete("1");
+ assertTrue(deleteExecuted);
+
+ assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY));
+ assertEquals(timestampedRangeIter, store.range("one", "two"));
+ assertEquals(timestampedAllIter, store.all());
+ assertEquals(VALUE, store.approximateNumEntries());
+ });
+ }
+
+ @Test
public void localWindowStoreShouldNotAllowInitOrClose() {
doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
verifyStoreCannotBeInitializedOrClosed(store);
@@ -333,6 +386,48 @@ public class ProcessorContextImplTest {
}
@SuppressWarnings("unchecked")
+ private TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock() {
+ final TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock = mock(TimestampedKeyValueStore.class);
+
+ initStateStoreMock(timestampedKeyValueStoreMock);
+
+ expect(timestampedKeyValueStoreMock.get(KEY)).andReturn(VALUE_AND_TIMESTAMP);
+ expect(timestampedKeyValueStoreMock.approximateNumEntries()).andReturn(VALUE);
+
+ expect(timestampedKeyValueStoreMock.range("one", "two")).andReturn(timestampedRangeIter);
+ expect(timestampedKeyValueStoreMock.all()).andReturn(timestampedAllIter);
+
+
+ timestampedKeyValueStoreMock.put(anyString(), anyObject(ValueAndTimestamp.class));
+ expectLastCall().andAnswer(() -> {
+ putExecuted = true;
+ return null;
+ });
+
+ timestampedKeyValueStoreMock.putIfAbsent(anyString(), anyObject(ValueAndTimestamp.class));
+ expectLastCall().andAnswer(() -> {
+ putIfAbsentExecuted = true;
+ return null;
+ });
+
+ timestampedKeyValueStoreMock.putAll(anyObject(List.class));
+ expectLastCall().andAnswer(() -> {
+ putAllExecuted = true;
+ return null;
+ });
+
+ timestampedKeyValueStoreMock.delete(anyString());
+ expectLastCall().andAnswer(() -> {
+ deleteExecuted = true;
+ return null;
+ });
+
+ replay(timestampedKeyValueStoreMock);
+
+ return timestampedKeyValueStoreMock;
+ }
+
+ @SuppressWarnings("unchecked")
private WindowStore<String, Long> windowStoreMock() {
final WindowStore<String, Long> windowStore = mock(WindowStore.class);