You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/12 16:28:31 UTC

[kafka] branch trunk updated: KAFKA-3522: add missing guards for TimestampedXxxStore (#6356)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab7ea07  KAFKA-3522: add missing guards for TimestampedXxxStore (#6356)
ab7ea07 is described below

commit ab7ea07f5e57ec405dc7fddce95de7c639a2fd6e
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Tue Mar 12 09:28:14 2019 -0700

    KAFKA-3522: add missing guards for TimestampedXxxStore (#6356)
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
 .../processor/internals/ProcessorContextImpl.java  |  27 +++++-
 .../internals/GlobalProcessorContextImplTest.java  | 106 ++++++++++++++++++---
 .../internals/ProcessorContextImplTest.java        |  95 ++++++++++++++++++
 3 files changed, 215 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c10ea09..5f32a3b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
@@ -84,7 +85,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         final StateStore global = stateManager.getGlobalStore(name);
         if (global != null) {
-            if (global instanceof KeyValueStore) {
+            if (global instanceof TimestampedKeyValueStore) {
+                return new TimestampedKeyValueStoreReadOnlyDecorator((TimestampedKeyValueStore) global);
+            } else if (global instanceof KeyValueStore) {
                 return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global);
             } else if (global instanceof TimestampedWindowStore) {
                 return new TimestampedWindowStoreReadOnlyDecorator((TimestampedWindowStore) global);
@@ -108,7 +111,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         final StateStore store = stateManager.getStore(name);
-        if (store instanceof KeyValueStore) {
+        if (store instanceof TimestampedKeyValueStore) {
+            return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store);
+        } else if (store instanceof KeyValueStore) {
             return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
         } else if (store instanceof TimestampedWindowStore) {
             return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store);
@@ -294,6 +299,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
+    private static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
+        extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
     private static class WindowStoreReadOnlyDecorator<K, V>
         extends StateStoreReadOnlyDecorator<WindowStore<K, V>, K, V>
         implements WindowStore<K, V> {
@@ -484,6 +498,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
+    private static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
+        extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
+        implements TimestampedKeyValueStore<K, V> {
+
+        private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
+            super(inner);
+        }
+    }
+
     static class WindowStoreReadWriteDecorator<K, V>
         extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V>
         implements WindowStore<K, V> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 4153cca..b36557c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -21,6 +21,10 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowStore;
 import org.hamcrest.core.IsInstanceOf;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,6 +43,11 @@ import static org.junit.Assert.fail;
 
 public class GlobalProcessorContextImplTest {
     private static final String GLOBAL_STORE_NAME = "global-store";
+    private static final String GLOBAL_KEY_VALUE_STORE_NAME = "global-key-value-store";
+    private static final String GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME = "global-timestamped-key-value-store";
+    private static final String GLOBAL_WINDOW_STORE_NAME = "global-window-store";
+    private static final String GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME = "global-timestamped-window-store";
+    private static final String GLOBAL_SESSION_STORE_NAME = "global-session-store";
     private static final String UNKNOWN_STORE = "unknown-store";
     private static final String CHILD_PROCESSOR = "child";
 
@@ -56,7 +65,12 @@ public class GlobalProcessorContextImplTest {
         replay(streamsConfig);
 
         final StateManager stateManager = mock(StateManager.class);
-        expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(KeyValueStore.class));
+        expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(StateStore.class));
+        expect(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).andReturn(mock(KeyValueStore.class));
+        expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).andReturn(mock(TimestampedKeyValueStore.class));
+        expect(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).andReturn(mock(WindowStore.class));
+        expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class));
+        expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class));
         expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
         replay(stateManager);
 
@@ -86,7 +100,7 @@ public class GlobalProcessorContextImplTest {
 
     @Test
     public void shouldReturnGlobalOrNullStore() {
-        assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new IsInstanceOf(KeyValueStore.class));
+        assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new IsInstanceOf(StateStore.class));
         assertNull(globalContext.getStateStore(UNKNOWN_STORE));
     }
 
@@ -135,22 +149,92 @@ public class GlobalProcessorContextImplTest {
     }
 
     @Test
-    public void shouldNotAllowInit() {
-        final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+    public void shouldNotAllowInitForKeyValueStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
             store.init(null, null);
             fail("Should have thrown UnsupportedOperationException.");
-        } catch (final UnsupportedOperationException expected) {
-        }
+        } catch (final UnsupportedOperationException expected) { }
     }
 
     @Test
-    public void shouldNotAllowClose() {
-        final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+    public void shouldNotAllowInitForTimestampedKeyValueStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
+        try {
+            store.init(null, null);
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowInitForWindowStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
+        try {
+            store.init(null, null);
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowInitForTimestampedWindowStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
+        try {
+            store.init(null, null);
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowInitForSessionStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
+        try {
+            store.init(null, null);
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowCloseForKeyValueStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
+        try {
+            store.close();
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowCloseForTimestampedKeyValueStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
+        try {
+            store.close();
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowCloseForWindowStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
+        try {
+            store.close();
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowCloseForTimestampedWindowStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
+        try {
+            store.close();
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowCloseForSessionStore() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
             store.close();
             fail("Should have thrown UnsupportedOperationException.");
-        } catch (final UnsupportedOperationException expected) {
-        }
+        } catch (final UnsupportedOperationException expected) { }
     }
-}
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 9b36ec7..fe4d948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
@@ -70,7 +71,9 @@ public class ProcessorContextImplTest {
     private boolean removeExecuted;
 
     private KeyValueIterator<String, Long> rangeIter;
+    private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedRangeIter;
     private KeyValueIterator<String, Long> allIter;
+    private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedAllIter;
 
     private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7);
     private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<>(7);
@@ -86,7 +89,9 @@ public class ProcessorContextImplTest {
         removeExecuted = false;
 
         rangeIter = mock(KeyValueIterator.class);
+        timestampedRangeIter = mock(KeyValueIterator.class);
         allIter = mock(KeyValueIterator.class);
+        timestampedAllIter = mock(KeyValueIterator.class);
         windowStoreIter = mock(WindowStoreIterator.class);
 
         for (int i = 0; i < 7; i++) {
@@ -103,12 +108,14 @@ public class ProcessorContextImplTest {
         final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
+        expect(stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock());
         expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock());
         expect(stateManager.getGlobalStore("GlobalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
         expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock());
         expect(stateManager.getGlobalStore(anyString())).andReturn(null);
 
         expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock());
+        expect(stateManager.getStore("LocalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock());
         expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock());
         expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock());
         expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock());
@@ -128,6 +135,7 @@ public class ProcessorContextImplTest {
         context.setCurrentNode(new ProcessorNode<String, Long>("fake", null,
             new HashSet<>(asList(
                 "LocalKeyValueStore",
+                "LocalTimestampedKeyValueStore",
                 "LocalWindowStore",
                 "LocalTimestampedWindowStore",
                 "LocalSessionStore"))));
@@ -152,6 +160,24 @@ public class ProcessorContextImplTest {
     }
 
     @Test
+    public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
+        doTest("GlobalTimestampedKeyValueStore", (Consumer<TimestampedKeyValueStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            checkThrowsUnsupportedOperation(store::flush, "flush()");
+            checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 2L)), "put()");
+            checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L)), "putIfAbsent()");
+            checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()");
+            checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()");
+
+            assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY));
+            assertEquals(timestampedRangeIter, store.range("one", "two"));
+            assertEquals(timestampedAllIter, store.all());
+            assertEquals(VALUE, store.approximateNumEntries());
+        });
+    }
+
+    @Test
     public void globalWindowStoreShouldBeReadOnly() {
         doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -229,6 +255,33 @@ public class ProcessorContextImplTest {
     }
 
     @Test
+    public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
+        doTest("LocalTimestampedKeyValueStore", (Consumer<TimestampedKeyValueStore<String, Long>>) store -> {
+            verifyStoreCannotBeInitializedOrClosed(store);
+
+            store.flush();
+            assertTrue(flushExecuted);
+
+            store.put("1", ValueAndTimestamp.make(1L, 2L));
+            assertTrue(putExecuted);
+
+            store.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L));
+            assertTrue(putIfAbsentExecuted);
+
+            store.putAll(Collections.emptyList());
+            assertTrue(putAllExecuted);
+
+            store.delete("1");
+            assertTrue(deleteExecuted);
+
+            assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY));
+            assertEquals(timestampedRangeIter, store.range("one", "two"));
+            assertEquals(timestampedAllIter, store.all());
+            assertEquals(VALUE, store.approximateNumEntries());
+        });
+    }
+
+    @Test
     public void localWindowStoreShouldNotAllowInitOrClose() {
         doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
             verifyStoreCannotBeInitializedOrClosed(store);
@@ -333,6 +386,48 @@ public class ProcessorContextImplTest {
     }
 
     @SuppressWarnings("unchecked")
+    private TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock() {
+        final TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock = mock(TimestampedKeyValueStore.class);
+
+        initStateStoreMock(timestampedKeyValueStoreMock);
+
+        expect(timestampedKeyValueStoreMock.get(KEY)).andReturn(VALUE_AND_TIMESTAMP);
+        expect(timestampedKeyValueStoreMock.approximateNumEntries()).andReturn(VALUE);
+
+        expect(timestampedKeyValueStoreMock.range("one", "two")).andReturn(timestampedRangeIter);
+        expect(timestampedKeyValueStoreMock.all()).andReturn(timestampedAllIter);
+
+
+        timestampedKeyValueStoreMock.put(anyString(), anyObject(ValueAndTimestamp.class));
+        expectLastCall().andAnswer(() -> {
+            putExecuted = true;
+            return null;
+        });
+
+        timestampedKeyValueStoreMock.putIfAbsent(anyString(), anyObject(ValueAndTimestamp.class));
+        expectLastCall().andAnswer(() -> {
+            putIfAbsentExecuted = true;
+            return null;
+        });
+
+        timestampedKeyValueStoreMock.putAll(anyObject(List.class));
+        expectLastCall().andAnswer(() -> {
+            putAllExecuted = true;
+            return null;
+        });
+
+        timestampedKeyValueStoreMock.delete(anyString());
+        expectLastCall().andAnswer(() -> {
+            deleteExecuted = true;
+            return null;
+        });
+
+        replay(timestampedKeyValueStoreMock);
+
+        return timestampedKeyValueStoreMock;
+    }
+
+    @SuppressWarnings("unchecked")
     private WindowStore<String, Long> windowStoreMock() {
         final WindowStore<String, Long> windowStore = mock(WindowStore.class);