You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/06 18:56:53 UTC

[kafka] branch trunk updated: KAFKA-8006: Guard calls to init and close from global processor (#6353)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8662efb  KAFKA-8006: Guard calls to init and close from global processor (#6353)
8662efb is described below

commit 8662efbad24c859aeee5c3b469dbc907e39ff82d
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Mar 6 10:56:44 2019 -0800

    KAFKA-8006: Guard calls to init and close from global processor (#6353)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../internals/GlobalProcessorContextImpl.java      | 19 ++++++++++++++++++-
 .../processor/internals/ProcessorContextImpl.java  | 12 ++++++------
 .../internals/GlobalProcessorContextImplTest.java  | 22 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 2f58836..900cc71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -23,7 +23,13 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
@@ -39,9 +45,20 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         super(new TaskId(-1, -1), config, metrics, stateMgr, cache);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public StateStore getStateStore(final String name) {
-        return stateManager.getGlobalStore(name);
+        final StateStore store = stateManager.getGlobalStore(name);
+
+        if (store instanceof KeyValueStore) {
+            return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+        } else if (store instanceof WindowStore) {
+            return new WindowStoreReadWriteDecorator((WindowStore) store);
+        } else if (store instanceof SessionStore) {
+            return new SessionStoreReadWriteDecorator((SessionStore) store);
+        }
+
+        return store;
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c1c3a60..36a3750 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -411,11 +411,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class KeyValueStoreReadWriteDecorator<K, V>
+    static class KeyValueStoreReadWriteDecorator<K, V>
         extends StateStoreReadWriteDecorator<KeyValueStore<K, V>, K, V>
         implements KeyValueStore<K, V> {
 
-        private KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+        KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
             super(inner);
         }
 
@@ -463,11 +463,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class WindowStoreReadWriteDecorator<K, V>
+    static class WindowStoreReadWriteDecorator<K, V>
         extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V>
         implements WindowStore<K, V> {
 
-        private WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+        WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
             super(inner);
         }
 
@@ -520,11 +520,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class SessionStoreReadWriteDecorator<K, AGG>
+    static class SessionStoreReadWriteDecorator<K, AGG>
         extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
         implements SessionStore<K, AGG> {
 
-        private SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+        SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
             super(inner);
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index cee7d48..deb14e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.hamcrest.core.IsInstanceOf;
@@ -34,6 +35,7 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 public class GlobalProcessorContextImplTest {
     private static final String GLOBAL_STORE_NAME = "global-store";
@@ -129,4 +131,24 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowToSchedulePunctuations() {
         globalContext.schedule(null, null, null);
     }
+
+    @Test
+    public void shouldNotAllowInit() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+        try {
+            store.init(null, null);
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) {
+        }
+    }
+
+    @Test
+    public void shouldNotAllowClose() {
+        final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+        try {
+            store.close();
+            fail("Should have thrown UnsupportedOperationException.");
+        } catch (final UnsupportedOperationException expected) {
+        }
+    }
 }