You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/06 18:56:53 UTC
[kafka] branch trunk updated: KAFKA-8006: Guard calls to init and
close from global processor (#6353)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8662efb KAFKA-8006: Guard calls to init and close from global processor (#6353)
8662efb is described below
commit 8662efbad24c859aeee5c3b469dbc907e39ff82d
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Mar 6 10:56:44 2019 -0800
KAFKA-8006: Guard calls to init and close from global processor (#6353)
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../internals/GlobalProcessorContextImpl.java | 19 ++++++++++++++++++-
.../processor/internals/ProcessorContextImpl.java | 12 ++++++------
.../internals/GlobalProcessorContextImplTest.java | 22 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 7 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 2f58836..900cc71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -23,7 +23,13 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.time.Duration;
@@ -39,9 +45,20 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
super(new TaskId(-1, -1), config, metrics, stateMgr, cache);
}
+ @SuppressWarnings("unchecked")
@Override
public StateStore getStateStore(final String name) {
- return stateManager.getGlobalStore(name);
+ final StateStore store = stateManager.getGlobalStore(name);
+
+ if (store instanceof KeyValueStore) {
+ return new KeyValueStoreReadWriteDecorator((KeyValueStore) store);
+ } else if (store instanceof WindowStore) {
+ return new WindowStoreReadWriteDecorator((WindowStore) store);
+ } else if (store instanceof SessionStore) {
+ return new SessionStoreReadWriteDecorator((SessionStore) store);
+ }
+
+ return store;
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c1c3a60..36a3750 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -411,11 +411,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
- private static class KeyValueStoreReadWriteDecorator<K, V>
+ static class KeyValueStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<KeyValueStore<K, V>, K, V>
implements KeyValueStore<K, V> {
- private KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+ KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
super(inner);
}
@@ -463,11 +463,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
- private static class WindowStoreReadWriteDecorator<K, V>
+ static class WindowStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V>
implements WindowStore<K, V> {
- private WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+ WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
super(inner);
}
@@ -520,11 +520,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
- private static class SessionStoreReadWriteDecorator<K, AGG>
+ static class SessionStoreReadWriteDecorator<K, AGG>
extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
implements SessionStore<K, AGG> {
- private SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+ SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
super(inner);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index cee7d48..deb14e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hamcrest.core.IsInstanceOf;
@@ -34,6 +35,7 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
public class GlobalProcessorContextImplTest {
private static final String GLOBAL_STORE_NAME = "global-store";
@@ -129,4 +131,24 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowToSchedulePunctuations() {
globalContext.schedule(null, null, null);
}
+
+ @Test
+ public void shouldNotAllowInit() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+ try {
+ store.init(null, null);
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) {
+ }
+ }
+
+ @Test
+ public void shouldNotAllowClose() {
+ final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME);
+ try {
+ store.close();
+ fail("Should have thrown UnsupportedOperationException.");
+ } catch (final UnsupportedOperationException expected) {
+ }
+ }
}