You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/18 21:51:32 UTC
[kafka] branch trunk updated: MINOR: consolidate processor context
for active/standby (#8669)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 392e49b MINOR: consolidate processor context for active/standby (#8669)
392e49b is described below
commit 392e49b1eddd2fcd8d09177cc80abc8a51f5c991
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Mon May 18 14:50:54 2020 -0700
MINOR: consolidate processor context for active/standby (#8669)
This is a prerequisite for KAFKA-9501 and will also be useful for KAFKA-9603
There should be no logical changes here: the main difference is the removal of StandbyContextImpl in preparation for contexts to transition between active and standby.
Also includes some minor cleanup, eg pulling the ReadOnly/ReadWrite decorators out into a separate file.
Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <vv...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../internals/AbstractProcessorContext.java | 10 +-
.../internals/AbstractReadOnlyDecorator.java | 252 ++++++++++
.../internals/AbstractReadWriteDecorator.java | 248 ++++++++++
.../internals/GlobalProcessorContextImpl.java | 37 +-
.../internals/GlobalStateManagerImpl.java | 7 +-
.../internals/InternalProcessorContext.java | 19 +-
.../processor/internals/ProcessorContextImpl.java | 548 +++++----------------
.../processor/internals/ProcessorStateManager.java | 4 +-
.../processor/internals/StandbyContextImpl.java | 190 -------
.../streams/processor/internals/StandbyTask.java | 2 +-
.../streams/processor/internals/StateManager.java | 4 +-
.../state/internals/CachingKeyValueStore.java | 2 +-
.../state/internals/CachingSessionStore.java | 2 +-
.../state/internals/CachingWindowStore.java | 2 +-
.../internals/ChangeLoggingKeyValueBytesStore.java | 14 +-
.../internals/ChangeLoggingSessionBytesStore.java | 20 +-
...ChangeLoggingTimestampedKeyValueBytesStore.java | 4 +-
.../ChangeLoggingTimestampedWindowBytesStore.java | 4 +-
.../internals/ChangeLoggingWindowBytesStore.java | 17 +-
.../streams/state/internals/StoreChangeLogger.java | 71 ---
.../internals/AbstractProcessorContextTest.java | 12 +-
.../internals/GlobalProcessorContextImplTest.java | 2 +
.../internals/ProcessorContextImplTest.java | 203 +++++++-
.../processor/internals/ProcessorContextTest.java | 9 +-
.../processor/internals/StandbyTaskTest.java | 5 +-
.../processor/internals/StateManagerStub.java | 6 +
.../streams/state/KeyValueStoreTestDriver.java | 2 +-
.../ChangeLoggingSessionBytesStoreTest.java | 29 +-
...angeLoggingTimestampedWindowBytesStoreTest.java | 37 +-
.../ChangeLoggingWindowBytesStoreTest.java | 40 +-
.../state/internals/StoreChangeLoggerTest.java | 84 ----
.../apache/kafka/test/GlobalStateManagerStub.java | 6 +
.../kafka/test/InternalMockProcessorContext.java | 23 +
.../kafka/test/MockInternalProcessorContext.java | 17 +-
.../apache/kafka/test/NoOpProcessorContext.java | 17 +-
.../apache/kafka/streams/TopologyTestDriver.java | 2 +-
.../streams/processor/MockProcessorContext.java | 1 +
37 files changed, 1020 insertions(+), 932 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index e684344..1132708 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -30,7 +31,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-
public abstract class AbstractProcessorContext implements InternalProcessorContext {
public static final String NONEXIST_TOPIC = "__null_topic__";
@@ -138,6 +138,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
if (recordContext == null) {
throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
}
+
return recordContext.partition();
}
@@ -205,7 +206,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
}
@Override
- public ThreadCache getCache() {
+ public ThreadCache cache() {
return cache;
}
@@ -218,4 +219,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
public void uninitialize() {
initialized = false;
}
+
+ @Override
+ public TaskType taskType() {
+ return stateManager.taskType();
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
new file mode 100644
index 0000000..a63cd99
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.List;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
+
+ static final String ERROR_MESSAGE = "Global store is read only";
+
+ private AbstractReadOnlyDecorator(final T inner) {
+ super(inner);
+ }
+
+ @Override
+ public void flush() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ static StateStore getReadOnlyStore(final StateStore global) {
+ if (global instanceof TimestampedKeyValueStore) {
+ return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore<?, ?>) global);
+ } else if (global instanceof KeyValueStore) {
+ return new KeyValueStoreReadOnlyDecorator<>((KeyValueStore<?, ?>) global);
+ } else if (global instanceof TimestampedWindowStore) {
+ return new TimestampedWindowStoreReadOnlyDecorator<>((TimestampedWindowStore<?, ?>) global);
+ } else if (global instanceof WindowStore) {
+ return new WindowStoreReadOnlyDecorator<>((WindowStore<?, ?>) global);
+ } else if (global instanceof SessionStore) {
+ return new SessionStoreReadOnlyDecorator<>((SessionStore<?, ?>) global);
+ } else {
+ return global;
+ }
+ }
+
+ static class KeyValueStoreReadOnlyDecorator<K, V>
+ extends AbstractReadOnlyDecorator<KeyValueStore<K, V>, K, V>
+ implements KeyValueStore<K, V> {
+
+ private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
+ super(inner);
+ }
+
+ @Override
+ public V get(final K key) {
+ return wrapped().get(key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(final K from,
+ final K to) {
+ return wrapped().range(from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return wrapped().all();
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return wrapped().approximateNumEntries();
+ }
+
+ @Override
+ public void put(final K key,
+ final V value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public V putIfAbsent(final K key,
+ final V value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, V>> entries) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public V delete(final K key) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ }
+
+ static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
+ extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
+ implements TimestampedKeyValueStore<K, V> {
+
+ private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) {
+ super(inner);
+ }
+ }
+
+ static class WindowStoreReadOnlyDecorator<K, V>
+ extends AbstractReadOnlyDecorator<WindowStore<K, V>, K, V>
+ implements WindowStore<K, V> {
+
+ private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
+ super(inner);
+ }
+
+ @Deprecated
+ @Override
+ public void put(final K key,
+ final V value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public V fetch(final K key,
+ final long time) {
+ return wrapped().fetch(key, time);
+ }
+
+ @Override
+ @Deprecated
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped().fetch(key, timeFrom, timeTo);
+ }
+
+ @Override
+ @Deprecated
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped().fetch(from, to, timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ return wrapped().all();
+ }
+
+ @Override
+ @Deprecated
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return wrapped().fetchAll(timeFrom, timeTo);
+ }
+ }
+
+ static class TimestampedWindowStoreReadOnlyDecorator<K, V>
+ extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
+ implements TimestampedWindowStore<K, V> {
+
+ private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) {
+ super(inner);
+ }
+ }
+
+ static class SessionStoreReadOnlyDecorator<K, AGG>
+ extends AbstractReadOnlyDecorator<SessionStore<K, AGG>, K, AGG>
+ implements SessionStore<K, AGG> {
+
+ private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
+ super(inner);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime) {
+ return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+ final K keyTo,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime) {
+ return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public void put(final Windowed<K> sessionKey,
+ final AGG aggregate) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public AGG fetchSession(final K key, final long startTime, final long endTime) {
+ return wrapped().fetchSession(key, startTime, endTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+ return wrapped().fetch(key);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+ final K to) {
+ return wrapped().fetch(from, to);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
new file mode 100644
index 0000000..494d98e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.List;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
+ static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
+
+ private AbstractReadWriteDecorator(final T inner) {
+ super(inner);
+ }
+
+ @Override
+ public void init(final ProcessorContext context,
+ final StateStore root) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ static StateStore getReadWriteStore(final StateStore store) {
+ if (store instanceof TimestampedKeyValueStore) {
+ return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
+ } else if (store instanceof KeyValueStore) {
+ return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store);
+ } else if (store instanceof TimestampedWindowStore) {
+ return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store);
+ } else if (store instanceof WindowStore) {
+ return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store);
+ } else if (store instanceof SessionStore) {
+ return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store);
+ } else {
+ return store;
+ }
+ }
+
+ static class KeyValueStoreReadWriteDecorator<K, V>
+ extends AbstractReadWriteDecorator<KeyValueStore<K, V>, K, V>
+ implements KeyValueStore<K, V> {
+
+ KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+ super(inner);
+ }
+
+ @Override
+ public V get(final K key) {
+ return wrapped().get(key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(final K from,
+ final K to) {
+ return wrapped().range(from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return wrapped().all();
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return wrapped().approximateNumEntries();
+ }
+
+ @Override
+ public void put(final K key,
+ final V value) {
+ wrapped().put(key, value);
+ }
+
+ @Override
+ public V putIfAbsent(final K key,
+ final V value) {
+ return wrapped().putIfAbsent(key, value);
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, V>> entries) {
+ wrapped().putAll(entries);
+ }
+
+ @Override
+ public V delete(final K key) {
+ return wrapped().delete(key);
+ }
+ }
+
+ static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
+ extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
+ implements TimestampedKeyValueStore<K, V> {
+
+ TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
+ super(inner);
+ }
+ }
+
+ static class WindowStoreReadWriteDecorator<K, V>
+ extends AbstractReadWriteDecorator<WindowStore<K, V>, K, V>
+ implements WindowStore<K, V> {
+
+ WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+ super(inner);
+ }
+
+ @Deprecated
+ @Override
+ public void put(final K key,
+ final V value) {
+ wrapped().put(key, value);
+ }
+
+ @Override
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
+ wrapped().put(key, value, windowStartTimestamp);
+ }
+
+ @Override
+ public V fetch(final K key,
+ final long time) {
+ return wrapped().fetch(key, time);
+ }
+
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+ @Override
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped().fetch(key, timeFrom, timeTo);
+ }
+
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+ final K to,
+ final long timeFrom,
+ final long timeTo) {
+ return wrapped().fetch(from, to, timeFrom, timeTo);
+ }
+
+ @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return wrapped().fetchAll(timeFrom, timeTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ return wrapped().all();
+ }
+ }
+
+ static class TimestampedWindowStoreReadWriteDecorator<K, V>
+ extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
+ implements TimestampedWindowStore<K, V> {
+
+ TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) {
+ super(inner);
+ }
+ }
+
+ static class SessionStoreReadWriteDecorator<K, AGG>
+ extends AbstractReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
+ implements SessionStore<K, AGG> {
+
+ SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+ super(inner);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime) {
+ return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+ final K keyTo,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime) {
+ return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
+ wrapped().remove(sessionKey);
+ }
+
+ @Override
+ public void put(final Windowed<K> sessionKey,
+ final AGG aggregate) {
+ wrapped().put(sessionKey, aggregate);
+ }
+
+ @Override
+ public AGG fetchSession(final K key,
+ final long startTime,
+ final long endTime) {
+ return wrapped().fetchSession(key, startTime, endTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+ return wrapped().fetch(key);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+ final K to) {
+ return wrapped().fetch(from, to);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 859430c..81169d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals;
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
+
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
@@ -23,24 +26,13 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedKeyValueStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedWindowStoreReadWriteDecorator;
-import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.time.Duration;
public class GlobalProcessorContextImpl extends AbstractProcessorContext {
-
public GlobalProcessorContextImpl(final StreamsConfig config,
final StateManager stateMgr,
final StreamsMetricsImpl metrics,
@@ -51,20 +43,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
@Override
public StateStore getStateStore(final String name) {
final StateStore store = stateManager.getGlobalStore(name);
-
- if (store instanceof TimestampedKeyValueStore) {
- return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
- } else if (store instanceof KeyValueStore) {
- return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store);
- } else if (store instanceof TimestampedWindowStore) {
- return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store);
- } else if (store instanceof WindowStore) {
- return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store);
- } else if (store instanceof SessionStore) {
- return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store);
- }
-
- return store;
+ return getReadWriteStore(store);
}
@SuppressWarnings("unchecked")
@@ -130,4 +109,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
}
+
+ @Override
+ public void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp) {
+ throw new UnsupportedOperationException("this should not happen: logChange() not supported in global processor context.");
+ }
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f17131d..1def55c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;
@@ -222,7 +223,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
} finally {
globalConsumer.unsubscribe();
}
-
}
private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
@@ -339,7 +339,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
}
-
@Override
public void close() throws IOException {
try {
@@ -394,6 +393,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
}
+ @Override
+ public TaskType taskType() {
+ return TaskType.GLOBAL;
+ }
@Override
public Map<TopicPartition, Long> changelogOffsets() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 145e889..db5cfc9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -16,9 +16,13 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.RecordContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -29,6 +33,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
* {@link ThreadCache}
*/
public interface InternalProcessorContext extends ProcessorContext {
+ BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
+ ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
@Override
StreamsMetricsImpl metrics();
@@ -67,7 +73,7 @@ public interface InternalProcessorContext extends ProcessorContext {
/**
* Get the thread-global cache
*/
- ThreadCache getCache();
+ ThreadCache cache();
/**
* Mark this context as being initialized
@@ -80,10 +86,21 @@ public interface InternalProcessorContext extends ProcessorContext {
void uninitialize();
/**
+ * @return the type of task (active/standby/global) that this context corresponds to
+ */
+ TaskType taskType();
+
+ /**
* Get a correctly typed state store, given a handle on the original builder.
*/
@SuppressWarnings("unchecked")
default <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
return (T) getStateStore(builder.name());
}
+
+ void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp);
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index d390af5..c776367 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -16,87 +16,124 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.streams.KeyValue;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
-import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
import java.time.Duration;
import java.util.List;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
-
- private final StreamTask task;
+ // The below are both null for standby tasks
+ private final StreamTask streamTask;
private final RecordCollector collector;
+
private final ToInternal toInternal = new ToInternal();
private final static To SEND_TO_ALL = To.all();
+ final Map<String, String> storeToChangelogTopic = new HashMap<>();
+
ProcessorContextImpl(final TaskId id,
- final StreamTask task,
+ final StreamTask streamTask,
final StreamsConfig config,
final RecordCollector collector,
final ProcessorStateManager stateMgr,
final StreamsMetricsImpl metrics,
final ThreadCache cache) {
super(id, config, metrics, stateMgr, cache);
- this.task = task;
+ this.streamTask = streamTask;
this.collector = collector;
+
+ if (streamTask == null && taskType() == TaskType.ACTIVE) {
+ throw new IllegalStateException("Tried to create context for active task but the streamtask was null");
+ }
+ }
+
+ ProcessorContextImpl(final TaskId id,
+ final StreamsConfig config,
+ final ProcessorStateManager stateMgr,
+ final StreamsMetricsImpl metrics) {
+ this(
+ id,
+ null,
+ config,
+ null,
+ stateMgr,
+ metrics,
+ new ThreadCache(
+ new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
+ 0,
+ metrics
+ )
+ );
}
- public ProcessorStateManager getStateMgr() {
+ public ProcessorStateManager stateManager() {
return (ProcessorStateManager) stateManager;
}
@Override
+ public void register(final StateStore store,
+ final StateRestoreCallback stateRestoreCallback) {
+ storeToChangelogTopic.put(store.name(), ProcessorStateManager.storeChangelogTopic(applicationId(), store.name()));
+ super.register(store, stateRestoreCallback);
+ }
+
+ @Override
public RecordCollector recordCollector() {
return collector;
}
+ @Override
+ public void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp) {
+ throwUnsupportedOperationExceptionIfStandby("logChange");
+ // Sending null headers to changelog topics (KIP-244)
+ collector.send(
+ storeToChangelogTopic.get(storeName),
+ key,
+ value,
+ null,
+ taskId().partition,
+ timestamp,
+ BYTES_KEY_SERIALIZER,
+ BYTEARRAY_VALUE_SERIALIZER);
+ }
+
/**
* @throws StreamsException if an attempt is made to access this state store from an unknown node
+ * @throws UnsupportedOperationException if the current streamTask type is standby
*/
@Override
public StateStore getStateStore(final String name) {
+ throwUnsupportedOperationExceptionIfStandby("getStateStore");
if (currentNode() == null) {
throw new StreamsException("Accessing from an unknown node");
}
- final StateStore global = stateManager.getGlobalStore(name);
- if (global != null) {
- if (global instanceof TimestampedKeyValueStore) {
- return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore<?, ?>) global);
- } else if (global instanceof KeyValueStore) {
- return new KeyValueStoreReadOnlyDecorator<>((KeyValueStore<?, ?>) global);
- } else if (global instanceof TimestampedWindowStore) {
- return new TimestampedWindowStoreReadOnlyDecorator<>((TimestampedWindowStore<?, ?>) global);
- } else if (global instanceof WindowStore) {
- return new WindowStoreReadOnlyDecorator<>((WindowStore<?, ?>) global);
- } else if (global instanceof SessionStore) {
- return new SessionStoreReadOnlyDecorator<>((SessionStore<?, ?>) global);
- }
-
- return global;
+ final StateStore globalStore = stateManager.getGlobalStore(name);
+ if (globalStore != null) {
+ return getReadOnlyStore(globalStore);
}
if (!currentNode().stateStores.contains(name)) {
@@ -110,24 +147,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
final StateStore store = stateManager.getStore(name);
- if (store instanceof TimestampedKeyValueStore) {
- return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
- } else if (store instanceof KeyValueStore) {
- return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store);
- } else if (store instanceof TimestampedWindowStore) {
- return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store);
- } else if (store instanceof WindowStore) {
- return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store);
- } else if (store instanceof SessionStore) {
- return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store);
- }
-
- return store;
+ return getReadWriteStore(store);
}
@Override
public <K, V> void forward(final K key,
final V value) {
+ throwUnsupportedOperationExceptionIfStandby("forward");
forward(key, value, SEND_TO_ALL);
}
@@ -136,6 +162,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
public <K, V> void forward(final K key,
final V value,
final int childIndex) {
+ throwUnsupportedOperationExceptionIfStandby("forward");
forward(
key,
value,
@@ -147,6 +174,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
public <K, V> void forward(final K key,
final V value,
final String childName) {
+ throwUnsupportedOperationExceptionIfStandby("forward");
forward(key, value, To.child(childName));
}
@@ -155,6 +183,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
public <K, V> void forward(final K key,
final V value,
final To to) {
+ throwUnsupportedOperationExceptionIfStandby("forward");
final ProcessorNode<?, ?> previousNode = currentNode();
final ProcessorRecordContext previousContext = recordContext;
@@ -198,7 +227,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
public void commit() {
- task.requestCommit();
+ throwUnsupportedOperationExceptionIfStandby("commit");
+ streamTask.requestCommit();
}
@Override
@@ -206,10 +236,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
public Cancellable schedule(final long intervalMs,
final PunctuationType type,
final Punctuator callback) {
+ throwUnsupportedOperationExceptionIfStandby("schedule");
if (intervalMs < 1) {
throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
}
- return task.schedule(intervalMs, type, callback);
+ return streamTask.schedule(intervalMs, type, callback);
}
@SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
@@ -217,414 +248,57 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
public Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback) throws IllegalArgumentException {
+ throwUnsupportedOperationExceptionIfStandby("schedule");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
}
- private abstract static class StateStoreReadOnlyDecorator<T extends StateStore, K, V>
- extends WrappedStateStore<T, K, V> {
-
- static final String ERROR_MESSAGE = "Global store is read only";
-
- private StateStoreReadOnlyDecorator(final T inner) {
- super(inner);
- }
-
- @Override
- public void flush() {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public void init(final ProcessorContext context,
- final StateStore root) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public void close() {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
- }
-
- private static class KeyValueStoreReadOnlyDecorator<K, V>
- extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>, K, V>
- implements KeyValueStore<K, V> {
-
- private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
- super(inner);
- }
-
- @Override
- public V get(final K key) {
- return wrapped().get(key);
- }
-
- @Override
- public KeyValueIterator<K, V> range(final K from,
- final K to) {
- return wrapped().range(from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return wrapped().all();
- }
-
- @Override
- public long approximateNumEntries() {
- return wrapped().approximateNumEntries();
- }
-
- @Override
- public void put(final K key,
- final V value) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public V putIfAbsent(final K key,
- final V value) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public void putAll(final List<KeyValue<K, V>> entries) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public V delete(final K key) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
- }
-
- private static class TimestampedKeyValueStoreReadOnlyDecorator<K, V>
- extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
- implements TimestampedKeyValueStore<K, V> {
-
- private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) {
- super(inner);
- }
- }
-
- private static class WindowStoreReadOnlyDecorator<K, V>
- extends StateStoreReadOnlyDecorator<WindowStore<K, V>, K, V>
- implements WindowStore<K, V> {
-
- private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
- super(inner);
- }
-
- @Deprecated
- @Override
- public void put(final K key,
- final V value) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public void put(final K key,
- final V value,
- final long windowStartTimestamp) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public V fetch(final K key,
- final long time) {
- return wrapped().fetch(key, time);
- }
-
- @Override
- @Deprecated
- public WindowStoreIterator<V> fetch(final K key,
- final long timeFrom,
- final long timeTo) {
- return wrapped().fetch(key, timeFrom, timeTo);
- }
-
- @Override
- @Deprecated
- public KeyValueIterator<Windowed<K>, V> fetch(final K from,
- final K to,
- final long timeFrom,
- final long timeTo) {
- return wrapped().fetch(from, to, timeFrom, timeTo);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> all() {
- return wrapped().all();
- }
-
- @Override
- @Deprecated
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
- final long timeTo) {
- return wrapped().fetchAll(timeFrom, timeTo);
- }
- }
-
- private static class TimestampedWindowStoreReadOnlyDecorator<K, V>
- extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>>
- implements TimestampedWindowStore<K, V> {
-
- private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) {
- super(inner);
- }
+ @Override
+ public String topic() {
+ throwUnsupportedOperationExceptionIfStandby("topic");
+ return super.topic();
}
- private static class SessionStoreReadOnlyDecorator<K, AGG>
- extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>, K, AGG>
- implements SessionStore<K, AGG> {
-
- private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
- super(inner);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
- return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
- final K keyTo,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
- return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
- }
-
- @Override
- public void remove(final Windowed<K> sessionKey) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public void put(final Windowed<K> sessionKey,
- final AGG aggregate) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public AGG fetchSession(final K key, final long startTime, final long endTime) {
- return wrapped().fetchSession(key, startTime, endTime);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
- return wrapped().fetch(key);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
- final K to) {
- return wrapped().fetch(from, to);
- }
+ @Override
+ public int partition() {
+ throwUnsupportedOperationExceptionIfStandby("partition");
+ return super.partition();
}
- private abstract static class StateStoreReadWriteDecorator<T extends StateStore, K, V>
- extends WrappedStateStore<T, K, V> {
-
- static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
-
- private StateStoreReadWriteDecorator(final T inner) {
- super(inner);
- }
-
- @Override
- public void init(final ProcessorContext context,
- final StateStore root) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
- @Override
- public void close() {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
+ @Override
+ public long offset() {
+ throwUnsupportedOperationExceptionIfStandby("offset");
+ return super.offset();
}
- static class KeyValueStoreReadWriteDecorator<K, V>
- extends StateStoreReadWriteDecorator<KeyValueStore<K, V>, K, V>
- implements KeyValueStore<K, V> {
-
- KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
- super(inner);
- }
-
- @Override
- public V get(final K key) {
- return wrapped().get(key);
- }
-
- @Override
- public KeyValueIterator<K, V> range(final K from,
- final K to) {
- return wrapped().range(from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return wrapped().all();
- }
-
- @Override
- public long approximateNumEntries() {
- return wrapped().approximateNumEntries();
- }
-
- @Override
- public void put(final K key,
- final V value) {
- wrapped().put(key, value);
- }
-
- @Override
- public V putIfAbsent(final K key,
- final V value) {
- return wrapped().putIfAbsent(key, value);
- }
-
- @Override
- public void putAll(final List<KeyValue<K, V>> entries) {
- wrapped().putAll(entries);
- }
-
- @Override
- public V delete(final K key) {
- return wrapped().delete(key);
- }
+ @Override
+ public long timestamp() {
+ throwUnsupportedOperationExceptionIfStandby("timestamp");
+ return super.timestamp();
}
- static class TimestampedKeyValueStoreReadWriteDecorator<K, V>
- extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
- implements TimestampedKeyValueStore<K, V> {
-
- TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) {
- super(inner);
- }
+ @Override
+ public ProcessorNode<?, ?> currentNode() {
+ throwUnsupportedOperationExceptionIfStandby("currentNode");
+ return super.currentNode();
}
- static class WindowStoreReadWriteDecorator<K, V>
- extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V>
- implements WindowStore<K, V> {
-
- WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
- super(inner);
- }
-
- @Deprecated
- @Override
- public void put(final K key,
- final V value) {
- wrapped().put(key, value);
- }
-
- @Override
- public void put(final K key,
- final V value,
- final long windowStartTimestamp) {
- wrapped().put(key, value, windowStartTimestamp);
- }
-
- @Override
- public V fetch(final K key,
- final long time) {
- return wrapped().fetch(key, time);
- }
-
- @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
- @Override
- public WindowStoreIterator<V> fetch(final K key,
- final long timeFrom,
- final long timeTo) {
- return wrapped().fetch(key, timeFrom, timeTo);
- }
-
- @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
- @Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from,
- final K to,
- final long timeFrom,
- final long timeTo) {
- return wrapped().fetch(from, to, timeFrom, timeTo);
- }
-
- @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
- @Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
- final long timeTo) {
- return wrapped().fetchAll(timeFrom, timeTo);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> all() {
- return wrapped().all();
- }
+ @Override
+ public void setRecordContext(final ProcessorRecordContext recordContext) {
+ throwUnsupportedOperationExceptionIfStandby("setRecordContext");
+ super.setRecordContext(recordContext);
}
- static class TimestampedWindowStoreReadWriteDecorator<K, V>
- extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>>
- implements TimestampedWindowStore<K, V> {
-
- TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) {
- super(inner);
- }
+ @Override
+ public ProcessorRecordContext recordContext() {
+ throwUnsupportedOperationExceptionIfStandby("recordContext");
+ return super.recordContext();
}
- static class SessionStoreReadWriteDecorator<K, AGG>
- extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG>
- implements SessionStore<K, AGG> {
-
- SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
- super(inner);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
- return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
- final K keyTo,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
- return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
- }
-
- @Override
- public void remove(final Windowed<K> sessionKey) {
- wrapped().remove(sessionKey);
- }
-
- @Override
- public void put(final Windowed<K> sessionKey,
- final AGG aggregate) {
- wrapped().put(sessionKey, aggregate);
- }
-
- @Override
- public AGG fetchSession(final K key,
- final long startTime,
- final long endTime) {
- return wrapped().fetchSession(key, startTime, endTime);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
- return wrapped().fetch(key);
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
- final K to) {
- return wrapped().fetch(from, to);
+ private void throwUnsupportedOperationExceptionIfStandby(final String operationName) {
+ if (taskType() == TaskType.STANDBY) {
+ throw new UnsupportedOperationException(
+ "this should not happen: " + operationName + "() is not supported in standby tasks.");
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index a3ab881..f7c1936 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -345,8 +345,8 @@ public class ProcessorStateManager implements StateManager {
return sourcePartitions.contains(partition);
}
- // used by the changelog reader only
- TaskType taskType() {
+ @Override
+ public TaskType taskType() {
return taskType;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
deleted file mode 100644
index 9a94ad6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.time.Duration;
-
-class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
-
- StandbyContextImpl(final TaskId id,
- final StreamsConfig config,
- final ProcessorStateManager stateMgr,
- final StreamsMetricsImpl metrics) {
- super(
- id,
- config,
- metrics,
- stateMgr,
- new ThreadCache(
- new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
- 0,
- metrics
- )
- );
- }
-
-
- StateManager getStateMgr() {
- return stateManager;
- }
-
- @Override
- public RecordCollector recordCollector() {
- // return null collector specifically since in standby task it should not be called;
- // if ever then we would throw NPE, which should never happen
- return null;
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public StateStore getStateStore(final String name) {
- throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public String topic() {
- throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public int partition() {
- throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public long offset() {
- throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public long timestamp() {
- throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public <K, V> void forward(final K key, final V value) {
- throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public <K, V> void forward(final K key, final V value, final To to) {
- throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- @Deprecated
- public <K, V> void forward(final K key, final V value, final int childIndex) {
- throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- @Deprecated
- public <K, V> void forward(final K key, final V value, final String childName) {
- throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public void commit() {
- throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- @Deprecated
- public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
- throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException {
- throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public ProcessorRecordContext recordContext() {
- throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public void setRecordContext(final ProcessorRecordContext recordContext) {
- throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks.");
- }
-
- @Override
- public void setCurrentNode(final ProcessorNode currentNode) {
- // no-op. can't throw as this is called on commit when the StateStores get flushed.
- }
-
- /**
- * @throws UnsupportedOperationException on every invocation
- */
- @Override
- public ProcessorNode currentNode() {
- throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 83e0d73..b4abd79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -70,7 +70,7 @@ public class StandbyTask extends AbstractTask implements Task {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
- processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+ processorContext = new ProcessorContextImpl(id, config, stateMgr, metrics);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
eosEnabled = StreamThread.eosEnabled(config);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 0cb49754..674ea18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
import java.io.File;
import java.io.IOException;
import java.util.Map;
-
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
interface StateManager {
File baseDir();
@@ -46,6 +46,8 @@ interface StateManager {
void close() throws IOException;
+ TaskType taskType();
+
// TODO: we can remove this when consolidating global state manager into processor state manager
StateStore getGlobalStore(final String name);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 14f4e54..8026b04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -69,7 +69,7 @@ public class CachingKeyValueStore
private void initInternal(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;
- this.cache = this.context.getCache();
+ this.cache = this.context.cache();
this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
cache.addDirtyEntryFlushListener(cacheName, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index f537d4c..4976ef1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -72,7 +72,7 @@ class CachingSessionStore
this.context = context;
cacheName = context.taskId() + "-" + name();
- cache = context.getCache();
+ cache = context.cache();
cache.addDirtyEntryFlushListener(cacheName, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
putAndMaybeForward(entry, context);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index d2bd02e..e71f87e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -84,7 +84,7 @@ class CachingWindowStore
Serdes.Bytes(),
Serdes.ByteArray());
name = context.taskId() + "-" + name();
- cache = this.context.getCache();
+ cache = this.context.cache();
cache.addDirtyEntryFlushListener(name, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index a924af6..35f6d36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -16,15 +16,13 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
import java.util.List;
@@ -32,7 +30,7 @@ public class ChangeLoggingKeyValueBytesStore
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]>
implements KeyValueStore<Bytes, byte[]> {
- StoreChangeLogger<Bytes, byte[]> changeLogger;
+ InternalProcessorContext context;
ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) {
super(inner);
@@ -42,11 +40,7 @@ public class ChangeLoggingKeyValueBytesStore
public void init(final ProcessorContext context,
final StateStore root) {
super.init(context, root);
- final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
- changeLogger = new StoreChangeLogger<>(
- name(),
- context,
- new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+ this.context = (InternalProcessorContext) context;
// if the inner store is an LRU cache, add the eviction listener to log removed record
if (wrapped() instanceof MemoryLRUCache) {
@@ -113,6 +107,6 @@ public class ChangeLoggingKeyValueBytesStore
void log(final Bytes key,
final byte[] value) {
- changeLogger.logChange(key, value);
+ context.logChange(name(), key, value, context.timestamp());
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 361f8a5..cc586d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -16,15 +16,13 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StateSerdes;
/**
* Simple wrapper around a {@link SessionStore} to support writing
@@ -34,7 +32,7 @@ class ChangeLoggingSessionBytesStore
extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
implements SessionStore<Bytes, byte[]> {
- private StoreChangeLogger<Bytes, byte[]> changeLogger;
+ private InternalProcessorContext context;
ChangeLoggingSessionBytesStore(final SessionStore<Bytes, byte[]> bytesStore) {
super(bytesStore);
@@ -43,16 +41,9 @@ class ChangeLoggingSessionBytesStore
@Override
public void init(final ProcessorContext context, final StateStore root) {
super.init(context, root);
- final String topic = ProcessorStateManager.storeChangelogTopic(
- context.applicationId(),
- name());
- changeLogger = new StoreChangeLogger<>(
- name(),
- context,
- new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+ this.context = (InternalProcessorContext) context;
}
-
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
@@ -66,14 +57,13 @@ class ChangeLoggingSessionBytesStore
@Override
public void remove(final Windowed<Bytes> sessionKey) {
wrapped().remove(sessionKey);
- changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null);
+ context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp());
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(sessionKey, aggregate);
- changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), aggregate);
-
+ context.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, context.timestamp());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index 02e4c6a..7cdac97 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -32,9 +32,9 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
void log(final Bytes key,
final byte[] valueAndTimestamp) {
if (valueAndTimestamp != null) {
- changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
+ context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
} else {
- changeLogger.logChange(key, null);
+ context.logChange(name(), key, null, context.timestamp());
}
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 94362d4..3714150 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -33,9 +33,9 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS
void log(final Bytes key,
final byte[] valueAndTimestamp) {
if (valueAndTimestamp != null) {
- changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
+ context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp));
} else {
- changeLogger.logChange(key, null);
+ context.logChange(name(), key, null, context.timestamp());
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 8a9b91a..a04eb2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,14 +16,12 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -36,11 +34,9 @@ class ChangeLoggingWindowBytesStore
implements WindowStore<Bytes, byte[]> {
private final boolean retainDuplicates;
- private ProcessorContext context;
+ InternalProcessorContext context;
private int seqnum = 0;
- StoreChangeLogger<Bytes, byte[]> changeLogger;
-
ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
final boolean retainDuplicates) {
super(bytesStore);
@@ -50,13 +46,8 @@ class ChangeLoggingWindowBytesStore
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- this.context = context;
+ this.context = (InternalProcessorContext) context;
super.init(context, root);
- final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
- changeLogger = new StoreChangeLogger<>(
- name(),
- context,
- new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
}
@Override
@@ -114,7 +105,7 @@ class ChangeLoggingWindowBytesStore
void log(final Bytes key,
final byte[] value) {
- changeLogger.logChange(key, value);
+ context.logChange(name(), key, value, context.timestamp());
}
private int maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
deleted file mode 100644
index 7358120..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.StateSerdes;
-
-/**
- * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
- * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
- * i.e. use {@code StoreChangeLogger<Bytes, ...>} rather than {@code StoreChangeLogger<byte[], ...>}.
- *
- * @param <K>
- * @param <V>
- */
-class StoreChangeLogger<K, V> {
-
- private final String topic;
- private final int partition;
- private final ProcessorContext context;
- private final RecordCollector collector;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
-
- StoreChangeLogger(final String storeName,
- final ProcessorContext context,
- final StateSerdes<K, V> serialization) {
- this(storeName, context, context.taskId().partition, serialization);
- }
-
- private StoreChangeLogger(final String storeName,
- final ProcessorContext context,
- final int partition,
- final StateSerdes<K, V> serialization) {
- topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
- this.context = context;
- this.partition = partition;
- this.collector = ((RecordCollector.Supplier) context).recordCollector();
- keySerializer = serialization.keySerializer();
- valueSerializer = serialization.valueSerializer();
- }
-
- void logChange(final K key,
- final V value) {
- logChange(key, value, context.timestamp());
- }
-
- void logChange(final K key,
- final V value,
- final long timestamp) {
- // Sending null headers to changelog topics (KIP-244)
- collector.send(topic, key, value, null, partition, timestamp, keySerializer, valueSerializer);
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 4a3ee7b..72b415f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
@@ -180,7 +181,7 @@ public class AbstractProcessorContextTest {
public void appConfigsShouldReturnUnrecognizedValues() {
assertThat(
context.appConfigs().get("user.supplied.config"),
- equalTo("user-suppplied-value"));
+ equalTo("user-supplied-value"));
}
@@ -190,7 +191,7 @@ public class AbstractProcessorContextTest {
config = getStreamsConfig();
// Value must be a string to test className -> class conversion
config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigSetter.class.getName());
- config.put("user.supplied.config", "user-suppplied-value");
+ config.put("user.supplied.config", "user-supplied-value");
}
TestProcessorContext(final MockStreamsMetrics metrics) {
@@ -233,5 +234,12 @@ public class AbstractProcessorContextTest {
@Override
public void commit() {}
+
+ @Override
+ public void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp) {
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 8443e55..e4fe6ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -72,6 +73,7 @@ public class GlobalProcessorContextImplTest {
expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class));
expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class));
expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null);
+ expect(stateManager.taskType()).andStubReturn(TaskType.GLOBAL);
replay(stateManager);
globalContext = new GlobalProcessorContextImpl(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 5b52e9d..41cfdfa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -16,13 +16,18 @@
*/
package org.apache.kafka.streams.processor.internals;
+import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -33,6 +38,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
@@ -43,6 +49,8 @@ import java.util.List;
import java.util.function.Consumer;
import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTES_KEY_SERIALIZER;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
@@ -50,13 +58,19 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ProcessorContextImplTest {
private ProcessorContextImpl context;
+ private final StreamsConfig streamsConfig = streamsConfigMock();
+
+ private RecordCollector recordCollector = mock(RecordCollector.class);
+
private static final String KEY = "key";
private static final long VALUE = 42L;
private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(42L, 21L);
@@ -99,13 +113,8 @@ public class ProcessorContextImplTest {
timestampedIters.add(i, mock(KeyValueIterator.class));
}
- final StreamsConfig streamsConfig = mock(StreamsConfig.class);
- expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
- expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
- expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
- replay(streamsConfig);
-
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+ expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock());
expect(stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock());
@@ -125,7 +134,7 @@ public class ProcessorContextImplTest {
mock(TaskId.class),
mock(StreamTask.class),
streamsConfig,
- mock(RecordCollector.class),
+ recordCollector,
stateManager,
mock(StreamsMetricsImpl.class),
mock(ThreadCache.class)
@@ -140,6 +149,18 @@ public class ProcessorContextImplTest {
"LocalSessionStore"))));
}
+ private ProcessorContextImpl getStandbyContext() {
+ final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class);
+ expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
+ replay(stateManager);
+ return new ProcessorContextImpl(
+ mock(TaskId.class),
+ streamsConfig,
+ stateManager,
+ mock(StreamsMetricsImpl.class)
+ );
+ }
+
@Test
public void globalKeyValueStoreShouldBeReadOnly() {
doTest("GlobalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> {
@@ -347,6 +368,165 @@ public class ProcessorContextImplTest {
});
}
+ @Test
+ public void shouldNotSendRecordHeadersToChangelogTopic() {
+ final Bytes key = Bytes.wrap("key".getBytes());
+ final byte[] value = "zero".getBytes();
+
+ recordCollector.send(null, key, value, null, 0, 42L, BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER);
+
+ replay(recordCollector);
+ context.logChange("Store", key, value, 42L);
+
+ verify(recordCollector);
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnLogChange() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.logChange("Store", Bytes.wrap("k".getBytes()), null, 0L)
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.getStateStore("store")
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnForward() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.forward("key", "value")
+ );
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnForwardWithChildIndex() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.forward("key", "value", 0)
+ );
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnForwardWithChildName() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.forward("key", "value", "child-name")
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.forward("key", "value", To.child("child-name"))
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnCommit() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.commit()
+ );
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnScheduleWithInterval() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.schedule(100L, PunctuationType.STREAM_TIME, t -> { })
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnSchedule() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, t -> { })
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnTopic() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.topic()
+ );
+ }
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnPartition() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.partition()
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnOffset() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.offset()
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.timestamp()
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.currentNode()
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.setRecordContext(mock(ProcessorRecordContext.class))
+ );
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {
+ context = getStandbyContext();
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> context.recordContext()
+ );
+ }
+
@SuppressWarnings("unchecked")
private KeyValueStore<String, Long> keyValueStoreMock() {
final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class);
@@ -511,6 +691,15 @@ public class ProcessorContextImplTest {
return sessionStore;
}
+ private StreamsConfig streamsConfigMock() {
+ final StreamsConfig streamsConfig = mock(StreamsConfig.class);
+ expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andStubReturn("add-id");
+ expect(streamsConfig.defaultValueSerde()).andStubReturn(Serdes.ByteArray());
+ expect(streamsConfig.defaultKeySerde()).andStubReturn(Serdes.ByteArray());
+ replay(streamsConfig);
+ return streamsConfig;
+ }
+
private void initStateStoreMock(final StateStore stateStore) {
expect(stateStore.name()).andReturn(STORE_NAME);
expect(stateStore.persistent()).andReturn(true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
index 45e0165..44f01b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.junit.Before;
@@ -43,14 +44,18 @@ public class ProcessorContextTest {
expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
- replay(streamsConfig);
+
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+ expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE);
+
+ replay(streamsConfig, stateManager);
context = new ProcessorContextImpl(
mock(TaskId.class),
mock(StreamTask.class),
streamsConfig,
mock(RecordCollector.class),
- mock(ProcessorStateManager.class),
+ stateManager,
mock(StreamsMetricsImpl.class),
mock(ThreadCache.class)
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index c993d1d..7c14f23 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
@@ -144,7 +145,9 @@ public class StandbyTaskTest {
public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
- EasyMock.replay(stateDirectory);
+ EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
+
+ EasyMock.replay(stateDirectory, stateManager);
task = createStandbyTask();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index bf9abee..cf1eafd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
import java.io.File;
import java.util.Map;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
public class StateManagerStub implements StateManager {
@@ -59,4 +60,9 @@ public class StateManagerStub implements StateManager {
@Override
public void checkpoint(final Map<TopicPartition, Long> offsets) {}
+ @Override
+ public TaskType taskType() {
+ return null;
+ }
+
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 89740c3..7a39121 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> {
final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics());
@Override
- public ThreadCache getCache() {
+ public ThreadCache cache() {
return cache;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 79d66bf..5ab035c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -31,12 +31,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
@RunWith(EasyMockRunner.class)
public class ChangeLoggingSessionBytesStoreTest {
@@ -75,13 +69,15 @@ public class ChangeLoggingSessionBytesStoreTest {
init();
- store.put(key1, value1);
+ final Bytes binaryKey = SessionKeySchema.toBinary(key1);
- assertThat(collector.collected().size(), equalTo(1));
- assertThat(collector.collected().get(0).key(), equalTo(SessionKeySchema.toBinary(key1)));
- assertThat(collector.collected().get(0).value(), equalTo(value1));
+ EasyMock.reset(context);
+ context.logChange(store.name(), binaryKey, value1, 0L);
- EasyMock.verify(inner);
+ EasyMock.replay(context);
+ store.put(key1, value1);
+
+ EasyMock.verify(inner, context);
}
@Test
@@ -93,11 +89,14 @@ public class ChangeLoggingSessionBytesStoreTest {
store.remove(key1);
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
- assertThat(collector.collected().size(), equalTo(1));
- assertThat(collector.collected().get(0).key(), equalTo(binaryKey));
- assertThat(collector.collected().get(0).value(), nullValue());
- EasyMock.verify(inner);
+ EasyMock.reset(context);
+ context.logChange(store.name(), binaryKey, null, 0L);
+
+ EasyMock.replay(context);
+ store.remove(key1);
+
+ EasyMock.verify(inner, context);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index bde6d05..4a240b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -31,9 +31,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import static java.time.Instant.ofEpochMilli;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
@RunWith(EasyMockRunner.class)
public class ChangeLoggingTimestampedWindowBytesStoreTest {
@@ -75,15 +72,15 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
init();
- store.put(bytesKey, valueAndTimestamp);
-
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
- assertThat(collector.collected().size(), equalTo(1));
- assertThat(collector.collected().get(0).key(), equalTo(key));
- assertThat(collector.collected().get(0).value(), equalTo(value));
- assertThat(collector.collected().get(0).timestamp(), equalTo(42L));
- EasyMock.verify(inner);
+ EasyMock.reset(context);
+ context.logChange(store.name(), key, value, 42);
+
+ EasyMock.replay(context);
+ store.put(bytesKey, valueAndTimestamp);
+
+ EasyMock.verify(inner, context);
}
@Test
@@ -118,20 +115,20 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
EasyMock.expectLastCall().times(2);
init();
- store.put(bytesKey, valueAndTimestamp);
- store.put(bytesKey, valueAndTimestamp);
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
- assertThat(collector.collected().size(), equalTo(2));
- assertThat(collector.collected().get(0).key(), equalTo(key1));
- assertThat(collector.collected().get(0).value(), equalTo(value));
- assertThat(collector.collected().get(0).timestamp(), equalTo(42L));
- assertThat(collector.collected().get(1).key(), equalTo(key2));
- assertThat(collector.collected().get(1).value(), equalTo(value));
- assertThat(collector.collected().get(1).timestamp(), equalTo(42L));
- EasyMock.verify(inner);
+ EasyMock.reset(context);
+ context.logChange(store.name(), key1, value, 42L);
+ context.logChange(store.name(), key2, value, 42L);
+
+ EasyMock.replay(context);
+
+ store.put(bytesKey, valueAndTimestamp);
+ store.put(bytesKey, valueAndTimestamp);
+
+ EasyMock.verify(inner, context);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index ce60548..f4cb523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -31,9 +31,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import static java.time.Instant.ofEpochMilli;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
@RunWith(EasyMockRunner.class)
public class ChangeLoggingWindowBytesStoreTest {
@@ -74,15 +71,16 @@ public class ChangeLoggingWindowBytesStoreTest {
init();
- store.put(bytesKey, value);
-
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
- assertThat(collector.collected().size(), equalTo(1));
- assertThat(collector.collected().get(0).key(), equalTo(key));
- assertThat(collector.collected().get(0).value(), equalTo(value));
- assertThat(collector.collected().get(0).timestamp(), equalTo(0L));
- EasyMock.verify(inner);
+ EasyMock.reset(context);
+ EasyMock.expect(context.timestamp()).andStubReturn(0L);
+ context.logChange(store.name(), key, value, 0L);
+
+ EasyMock.replay(context);
+ store.put(bytesKey, value);
+
+ EasyMock.verify(inner, context);
}
@Test
@@ -113,24 +111,26 @@ public class ChangeLoggingWindowBytesStoreTest {
@SuppressWarnings("deprecation")
public void shouldRetainDuplicatesWhenSet() {
store = new ChangeLoggingWindowBytesStore(inner, true);
+
inner.put(bytesKey, value, 0);
EasyMock.expectLastCall().times(2);
init();
- store.put(bytesKey, value);
- store.put(bytesKey, value);
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
- assertThat(collector.collected().size(), equalTo(2));
- assertThat(collector.collected().get(0).key(), equalTo(key1));
- assertThat(collector.collected().get(0).value(), equalTo(value));
- assertThat(collector.collected().get(0).timestamp(), equalTo(0L));
- assertThat(collector.collected().get(1).key(), equalTo(key2));
- assertThat(collector.collected().get(1).value(), equalTo(value));
- assertThat(collector.collected().get(1).timestamp(), equalTo(0L));
- EasyMock.verify(inner);
+ EasyMock.reset(context);
+ EasyMock.expect(context.timestamp()).andStubReturn(0L);
+ context.logChange(store.name(), key1, value, 0L);
+ context.logChange(store.name(), key2, value, 0L);
+
+ EasyMock.replay(context);
+
+ store.put(bytesKey, value);
+ store.put(bytesKey, value);
+
+ EasyMock.verify(inner, context);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
deleted file mode 100644
index c5a89da..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
-public class StoreChangeLoggerTest {
-
- private final String topic = "topic";
-
- private final MockRecordCollector collector = new MockRecordCollector();
- private final InternalMockProcessorContext context = new InternalMockProcessorContext(
- StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
- collector);
-
- private final StoreChangeLogger<Integer, String> changeLogger =
- new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
-
- @Test
- public void testAddRemove() {
- context.setTime(1);
- changeLogger.logChange(0, "zero");
- context.setTime(5);
- changeLogger.logChange(1, "one");
- changeLogger.logChange(2, "two");
- changeLogger.logChange(3, "three", 42L);
- context.setTime(9);
- changeLogger.logChange(0, null);
-
- assertThat(collector.collected().size(), equalTo(5));
- assertThat(collector.collected().get(0).key(), equalTo(0));
- assertThat(collector.collected().get(0).value(), equalTo("zero"));
- assertThat(collector.collected().get(0).timestamp(), equalTo(1L));
- assertThat(collector.collected().get(1).key(), equalTo(1));
- assertThat(collector.collected().get(1).value(), equalTo("one"));
- assertThat(collector.collected().get(1).timestamp(), equalTo(5L));
- assertThat(collector.collected().get(2).key(), equalTo(2));
- assertThat(collector.collected().get(2).value(), equalTo("two"));
- assertThat(collector.collected().get(2).timestamp(), equalTo(5L));
- assertThat(collector.collected().get(3).key(), equalTo(3));
- assertThat(collector.collected().get(3).value(), equalTo("three"));
- assertThat(collector.collected().get(3).timestamp(), equalTo(42L));
- assertThat(collector.collected().get(4).key(), equalTo(0));
- assertThat(collector.collected().get(4).value(), nullValue());
- assertThat(collector.collected().get(4).timestamp(), equalTo(9L));
- }
-
- @Test
- public void shouldNotSendRecordHeadersToChangelogTopic() {
- context.headers().add(new RecordHeader("key", "value".getBytes()));
- changeLogger.logChange(0, "zero", 42L);
-
- assertThat(collector.collected().size(), equalTo(1));
- assertThat(collector.collected().get(0).key(), equalTo(0));
- assertThat(collector.collected().get(0).value(), equalTo("zero"));
- assertThat(collector.collected().get(0).timestamp(), equalTo(42L));
- assertThat(collector.collected().get(0).headers().toArray(), equalTo(Record.EMPTY_HEADERS));
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index 3def253..ae825bc 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import java.io.File;
import java.util.Map;
import java.util.Set;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
public class GlobalStateManagerStub implements GlobalStateManager {
@@ -82,4 +83,9 @@ public class GlobalStateManagerStub implements GlobalStateManager {
public Map<TopicPartition, Long> changelogOffsets() {
return offsets;
}
+
+ @Override
+ public TaskType taskType() {
+ return TaskType.GLOBAL;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 8cd784f..81876e4 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -38,6 +39,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StateSerdes;
@@ -349,6 +351,27 @@ public class InternalMockProcessorContext
return recordContext.headers();
}
+ @Override
+ public TaskType taskType() {
+ return TaskType.ACTIVE;
+ }
+
+ @Override
+ public void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp) {
+ recordCollector().send(
+ storeName + "-changelog",
+ key,
+ value,
+ null,
+ taskId().partition,
+ timestamp,
+ BYTES_KEY_SERIALIZER,
+ BYTEARRAY_VALUE_SERIALIZER);
+ }
+
public StateRestoreListener getRestoreListener(final String storeName) {
return getStateRestoreListener(restoreFuncs.get(storeName));
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 4b7cf49..e375085 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -38,6 +40,7 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
private ProcessorNode currentNode;
private RecordCollector recordCollector;
private long currentSystemTimeMs;
+ private TaskType taskType = TaskType.ACTIVE;
public MockInternalProcessorContext() {
}
@@ -88,7 +91,7 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
}
@Override
- public ThreadCache getCache() {
+ public ThreadCache cache() {
return null;
}
@@ -116,4 +119,16 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
public StateRestoreCallback stateRestoreCallback(final String storeName) {
return restoreCallbacks.get(storeName);
}
+
+ @Override
+ public TaskType taskType() {
+ return taskType;
+ }
+
+ @Override
+ public void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp) {
+ }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 77dd418..da8b7b4 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -17,6 +17,7 @@
package org.apache.kafka.test;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
@@ -32,6 +33,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
public class NoOpProcessorContext extends AbstractProcessorContext {
public boolean initialized;
@@ -101,5 +103,18 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
@Override
public void register(final StateStore store,
- final StateRestoreCallback stateRestoreCallback) {}
+ final StateRestoreCallback stateRestoreCallback) {
+ }
+
+ @Override
+ public TaskType taskType() {
+ return TaskType.ACTIVE;
+ }
+
+ @Override
+ public void logChange(final String storeName,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp) {
+ }
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index e78f966..8475172 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -991,7 +991,7 @@ public class TopologyTestDriver implements Closeable {
private StateStore getStateStore(final String name,
final boolean throwForBuiltInStores) {
if (task != null) {
- final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name);
+ final StateStore stateStore = ((ProcessorContextImpl) task.context()).stateManager().getStore(name);
if (stateStore != null) {
if (throwForBuiltInStores) {
throwIfBuiltInStore(stateStore);
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 73da6ef..b16eb32 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -75,6 +75,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
private final List<CapturedForward> capturedForwards = new LinkedList<>();
private boolean committed = false;
+
/**
* {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
*/