You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/18 13:58:16 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

cadonna commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r426592363



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -16,87 +16,129 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.streams.KeyValue;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import java.time.Duration;
 import java.util.List;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
+import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
+    public static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
+    public static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
 
-    private final StreamTask task;
+    // The below are both null for standby tasks
+    private final StreamTask streamTask;
     private final RecordCollector collector;
+
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
+    final Map<String, String> storeToChangelogTopic = new HashMap<>();
+
     ProcessorContextImpl(final TaskId id,
-                         final StreamTask task,
+                         final StreamTask streamTask,
                          final StreamsConfig config,
                          final RecordCollector collector,
                          final ProcessorStateManager stateMgr,
                          final StreamsMetricsImpl metrics,
                          final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
-        this.task = task;
+        this.streamTask = streamTask;
         this.collector = collector;
+
+        if (streamTask == null && taskType() == TaskType.ACTIVE) {
+            throw new IllegalStateException("Tried to create context for active task but the streamtask was null");
+        }
+    }
+
+    ProcessorContextImpl(final TaskId id,
+                         final StreamsConfig config,
+                         final ProcessorStateManager stateMgr,
+                         final StreamsMetricsImpl metrics) {
+        this(
+            id,
+            null,
+            config,
+            null,
+            stateMgr,
+            metrics,
+            new ThreadCache(
+                new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
+                0,
+                metrics
+            )
+        );
     }
 
-    public ProcessorStateManager getStateMgr() {
+    public ProcessorStateManager stateManager() {
         return (ProcessorStateManager) stateManager;
     }
 
+    @Override
+    public void register(final StateStore store,
+                         final StateRestoreCallback stateRestoreCallback) {
+        storeToChangelogTopic.put(store.name(), ProcessorStateManager.storeChangelogTopic(applicationId(), store.name()));
+        super.register(store, stateRestoreCallback);
+    }
+
     @Override
     public RecordCollector recordCollector() {
         return collector;
     }
 
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+        throwUnsupportedOperationExceptionIfStandby("logChange");

Review comment:
       req: Please unit test a processor context for a standby with the unsupported methods. That is, all code paths that involve a call to `throwUnsupportedOperationExceptionIfStandby()`.   

##########
File path: streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
##########
@@ -52,6 +54,8 @@
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KEY_SERIALIZER;
+import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.VALUE_SERIALIZER;

Review comment:
       prop: I think we should move those to `InternalProcessorContext`. IMO, it would be cleaner for a mock not to have a direct dependency to the class it mocks. The new consolidated mock for the internal processor context will not extend `AbstractProcessorContext` but only `MockProcessorContext`. Thus, the common ancestor will be `InternalProcessorContext`.
   Sorry for bothering you because of those constants after that @vvcephei has already bothered you. :-)  

##########
File path: streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
##########
@@ -349,6 +353,27 @@ public Headers headers() {
         return recordContext.headers();
     }
 
+    @Override
+    public TaskType taskType() {
+        return TaskType.ACTIVE;
+    }
+
+    @Override
+    public void logChange(final String storeName,
+                          final Bytes key,
+                          final byte[] value,
+                          final long timestamp) {
+        recordCollector().send(

Review comment:
       Both, `MockInternalProcessorContext` and `InternalMockProcessorContext` currently implement `RecordCollector.Supplier` (i.e., `recordCollector()`). Of course, once rebased the consolidated mock needs to implement `taskType()` and `logChange()`.
   See also my comment above regarding `KEY_SERIALIZER` and `VALUE_SERIALIZER`.
   @guozhangwang did you have anything specific in mind that I did not cover here?   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org