You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/01 21:37:12 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498400505



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -49,7 +51,28 @@
      * Initializes this state store.
      * <p>
      * The implementation of this function must register the root store in the context via the
-     * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
+     * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(StateStoreContext, StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);

Review comment:
       Not sure what's up with this diff, but this is the old API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -88,9 +88,12 @@ void register(final StateStore store,
      * Get the state store given the store name.
      *
      * @param name The store name
+     * @param <S> The type or interface of the store to return
      * @return The state store instance
+     *
+     * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);

Review comment:
       now that we can implement both the new and old contexts with the same Impl, we need this to resolve a clash. It's backward compatible and a nice quality-of-life improvement anyway.

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -160,24 +161,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         @Override
         public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
-            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
-            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
+            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
             store = context.getStateStore("aggStore");
         }
 
         @Override
-        public void process(final String key, final Long value) {
-            final Long oldValue = store.get(key);
-            if (oldValue == null || value > oldValue) {
-                store.put(key, value);
+        public void process(final Record<String, Long> record) {
+            final Long oldValue = store.get(record.key());
+            if (oldValue == null || record.value() > oldValue) {
+                store.put(record.key(), record.value());
             }
         }
 
-        private void flushStore() {
+        private void flushStore(final long timestamp) {
             final KeyValueIterator<String, Long> it = store.all();
             while (it.hasNext()) {
                 final KeyValue<String, Long> next = it.next();
-                context.forward(next.key, next.value);
+                context.forward(new Record<>(next.key, next.value, timestamp));

Review comment:
       Since we have to define a timestamp now, I'm showing the use of the punctuation time in the dev guide.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -83,30 +90,21 @@
      */
     StreamsMetrics metrics();
 
-    /**
-     * Registers and possibly restores the specified storage engine.
-     *
-     * @param store the storage engine
-     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
-     *
-     * @throws IllegalStateException If store gets registered after initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
-    void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
-

Review comment:
       Moved to the StateStoreContext.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -61,7 +84,9 @@
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final StateStoreContext context, final StateStore root) {
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }

Review comment:
       This is the new API. Note the default implementation that delegates to the old API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);

Review comment:
       Migrated to the new Record argument.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -29,7 +30,7 @@
 import static java.util.Objects.requireNonNull;
 import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
 
-public class ProcessorRecordContext implements RecordContext {
+public class ProcessorRecordContext implements RecordContext, RecordMetadata {

Review comment:
       Here's the implementation of RecordMetadata.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##########
@@ -49,26 +50,38 @@ protected StateManager stateManager() {
         return stateManager;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         final StateStore store = stateManager.getGlobalStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+    public <K, V> void forward(final Record<K, V> record) {
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         try {
             for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
+                ((ProcessorNode<K, V, ?, ?>) child).process(record);
             }
         } finally {
             setCurrentNode(previousNode);
         }
     }
 
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");

Review comment:
       Just implementing the new APIs while preserving the existing patterns.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1230,10 +1232,10 @@ public void shouldNotShareHeadersBetweenPunctuateIterations() {
         task.completeRestoration();
 
         task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            task.processorContext().recordContext().headers().add("dummy", (byte[]) null);
+            task.processorContext().headers().add("dummy", (byte[]) null);

Review comment:
       This test was actually testing a slightly wrong thing: `recordContext` was never exposed to users, they would have accessed the headers as `processorContext.header()`. It's important here because I've refactored the internal code to set `recordContext` to `null` when there is no defined context (such as in a punctuation like here).

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
##########
@@ -57,11 +59,11 @@ public void setUp() {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);

Review comment:
       Just a quick note. We do still expect the inner store to have the old init method invoked because none of the wrapper stores are implementing the new init method, so they're using the default implementation that delegates to the old init method. I'm going to take care of that in a follow-on PR.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##########
@@ -65,25 +66,19 @@ public void init(final ProcessorContext<KOut, VOut> context) {
             scheduleCancellable = context.schedule(
                 Duration.ofMillis(scheduleInterval),
                 punctuationType,
-                timestamp -> {
-                    if (punctuationType == PunctuationType.STREAM_TIME) {
-                        assertThat(context.timestamp(), is(timestamp));
-                    }
-                    assertThat(context.partition(), is(-1));
-                    assertThat(context.offset(), is(-1L));
-
-                    (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
-                        .add(timestamp);
-                });
+                (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)::add

Review comment:
       This can become a method reference now because those assertions on partition and offset are meaningless now.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -148,16 +149,16 @@ public TopologyTestDriverTest(final boolean eosEnabled) {
         }
     }
 
-    private final static class Record {
+    private final static class TTDTestRecord {

Review comment:
       This was a bit funny to run into by this point in the implementation. It turns out we already had a class called "Record", and now we need to reference both of them in this test. I felt like it was more readable to just give this class a new name instead of referencing it by fully qualified name.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
##########
@@ -46,12 +46,11 @@
     default void init(final ProcessorContext<KOut, VOut> context) {}
 
     /**
-     * Process the record with the given key and value.
+     * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
      *
-     * @param key the key for the record
-     * @param value the value for the record
+     * @param record the record to process
      */
-    void process(KIn key, VIn value);
+    void process(Record<KIn, VIn> record);

Review comment:
       The new Processor API (with Record) proposed in the KIP.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {

Review comment:
       The new state store context proposed in the KIP.

##########
File path: checkstyle/suppressions.xml
##########
@@ -185,7 +185,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="StreamThreadTest.java"/>
+              files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>

Review comment:
       These tests now have a couple more imports, which pushed them over the line. It's possible that they'll drop below the line again after completing the transition to KIP-478. Otherwise, we should refactor these tests to comply with the limit. But we should do that separately.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -55,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is
+     * receiving a record downstream of a Source (i.e., the current record is coming from an
+     * input topic), the metadata is defined. On the other hand, if a parent processor has
+     * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
+     * punctuator, then there is no record from an input topic, and therefore the metadata
+     * would be undefined.
+     */
+    Optional<RecordMetadata> recordMetadata();

Review comment:
       The new RecordMetadata context proposed in the KIP. Hopefully, the Javadoc is clear on why it's Optional.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }
+        this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);

Review comment:
       Note: each time we create a new Record, we copy the headers. This is an improvement over the current situation where there's no mutability barriers across the whole subtopology, so changes to headers in one processor can have unexpected effects on other processors that are very far away in the dependency diagram.
   
   However, it doesn't completely solve the problem: changes in children can still be visible to parents and siblings. @mjsax and I discussed an alternative option of providing a completely immutable implementation (copy on write) of Headers as a complete solution. But it also seems to be a pretty severe performance penalty. Instead, perhaps we can just document a safe pattern. E.g.,
   ```
   record = new Record(...)
   context.forward(record, "childA")
   record.headers().add(new header)
   // or
   record.withHeaders(record.headers().add(new header))
   context.forward(record, "childB")
   ```
   is unsafe because childA may modify the headers, affecting both the parent and childB. Instead, you should do something like:
   
   ```
   record1 = new Record(...)
   record2 = new Record(...)
   record2.headers().add(new header)
   context.forward(record1, "childA")
   context.forward(record2, "childB")
   ```
   Now, the headers for both children are completely independent objects.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
 
     /**
      * Requests a commit.
      */
     void commit();
 
-    /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the offset
-     */
-    long offset();
-
-    /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the headers
-     */
-    Headers headers();
-
-    /**
-     * Returns the current timestamp.
-     *
-     * <p> If it is triggered while processing a record streamed from the source processor,
-     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
-     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
-     *
-     * <p> If it is triggered while processing a record generated not from the source processor (for example,
-     * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the largest timestamp of any record processed by the task.
-     *
-     * @return the timestamp
-     */
-    long timestamp();

Review comment:
       These are moved to Record.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -147,7 +148,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+            stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       These are scattered throughout this PR. It's just selecting the init method we want to invoke. It's only necessary because the `globalProcessorContext` here actually implements both `ProcessorContext` and `StateStoreContext`. This is only true of our internal contexts, so users will not face a similar need to change code.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {

Review comment:
       Don't need this adapter anymore, either.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
 
     /**
      * Requests a commit.
      */
     void commit();
 
-    /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the offset
-     */
-    long offset();

Review comment:
       These are moved to RecordMetadata

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -114,7 +117,7 @@ public void init(final InternalProcessorContext context) {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init(ProcessorContextAdapter.adapt(context));
+                        processor.init((ProcessorContext<KOut, VOut>) context);

Review comment:
       Casting to add the generic params (the InternalProcessorContext is parameterized as `<Object, Object>`).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -104,7 +105,13 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
                     deserialized.headers());
             processorContext.setRecordContext(recordContext);
             processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
+            final Record<Object, Object> toProcess = new Record<>(
+                deserialized.key(),
+                deserialized.value(),
+                processorContext.timestamp(),
+                processorContext.headers()
+            );
+            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);

Review comment:
       This is a pretty common pattern where we need to bridge the new and old APIs. We construct the "record" by filling in the timestamp and headers from the context.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -866,9 +867,9 @@ public void init(final ProcessorContext<String, String> context) {
                     }
 
                     @Override
-                    public void process(final String key, final String value) {
-                        if (value.length() % 2 == 0) {
-                            context.forward(key, key + value);
+                    public void process(final Record<String, String> record) {
+                        if (record.value().length() % 2 == 0) {
+                            context.forward(record.withValue(record.key() + record.value()));

Review comment:
       A good example of updating just the value for the child.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
##########
@@ -33,7 +34,9 @@
  * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext extends ProcessorContext {
+public interface InternalProcessorContext
+    extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, StateStoreContext {

Review comment:
       Thanks to this, all our internal Impls are suitable to pass in to any of the new APIs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
##########
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.RecordContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-/**
- * For internal use so we can update the {@link RecordContext} and current
- * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
- * {@link ThreadCache}
- */
-public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {

Review comment:
       Don't need this anymore, since the InternalProcessorContext can now implement all of the new and old Contexts.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -742,8 +760,7 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
             throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
         }
 
-        updateProcessorContext(new StampedRecord(new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null),
-            timestamp), node, time.milliseconds());
+        updateProcessorContext(node, time.milliseconds(), null);

Review comment:
       Instead of setting a dummy context, we're now just setting the context to `null` aka "undefined".

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {

Review comment:
       The new Record class proposed in the KIP.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class StoreToProcessorContextAdapter implements ProcessorContext {
+    private final StateStoreContext delegate;
+
+    public static ProcessorContext adapt(final StateStoreContext delegate) {
+        if (delegate instanceof ProcessorContext) {
+            return (ProcessorContext) delegate;
+        } else {
+            return new StoreToProcessorContextAdapter(delegate);
+        }
+    }

Review comment:
       This allows us to transparently delegate the new API to the old one for StateStore implementations. Our internal StateStoreContext implementations all implement both APIs, so they just get casted, while if you use a separate implementation of StateStoreContext (e.g., in unit tests), it'll get adapted, which works just as long as the underlying store implementation doesn't try to call forward or anything.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
##########
@@ -86,7 +86,7 @@ public void register(final StateStore store,
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {

Review comment:
       Just implementing the interface.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -112,64 +114,69 @@ public void register(final StateStore store,
         stateManager().registerStore(store, stateRestoreCallback);
     }
 
-    /**
-     * @throws IllegalStateException if the task's record is null
-     */
     @Override
     public String topic() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
-        }
-
-        final String topic = recordContext.topic();
-
-        if (NONEXIST_TOPIC.equals(topic)) {
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For topic, the dummy value is `null`.

Review comment:
       The prior code was a bit misleading. I could not find _any_ code path where the context was actually null before, since we always initialized it with a "dummy context". This change simplifies the codebase by just moving the dummy values here and we now really do set the record context to `null` to (internally) signify when it is undefined.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
##########
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextAdapter<KForward, VForward>
-    implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {

Review comment:
       We also don't need these adapters anymore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
##########
@@ -23,6 +23,10 @@ public ToInternal() {
         super(To.all());
     }
 
+    public ToInternal(final To to) {
+        super(to);
+    }

Review comment:
       A copy constructor helped with the ProcessorContextImpl refactoring.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
##########
@@ -60,7 +61,7 @@ public void init(final ProcessorContext context) {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
-                        timestamp -> context.forward(-1, (int) timestamp)
+                        timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp))

Review comment:
       The prior code here was actually relying on a strange effect in which we set the (undefined) processor context's timestamp to the punctuation time. I could preserve that behavior, but it looked like a bug to me.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+public final class ConsumerRecordUtil {
+    private ConsumerRecordUtil() {}
+
+    public static <K, V> ConsumerRecord<K, V> record(final String topic,
+                                                     final int partition,
+                                                     final long offset,
+                                                     final K key,
+                                                     final V value) {
+        // the no-time constructor in ConsumerRecord initializes the
+        // timestamp to -1, which is an invalid configuration. Here,
+        // we initialize it to 0.
+        return new ConsumerRecord<>(
+            topic,
+            partition,
+            offset,
+            0L,
+            TimestampType.CREATE_TIME,
+            0L,
+            0,
+            0,
+            key,
+            value
+        );
+    }

Review comment:
       This is the utility method I replaced the ConsumerRecord constructor with earlier in the PR.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -775,8 +775,8 @@ public void init(final ProcessorContext<String, String> context) {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value);
+        public void process(final Record<String, String> record) {
+            context.forward(record);

Review comment:
       just a simple passthrough

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+public interface RecordMetadata {

Review comment:
       The new metadata proposed in the KIP. Note that it's an interface because in reality, it's just going to be a view onto the ProcessorRecordContext.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
##########
@@ -127,7 +128,7 @@ public void shouldInitializeProcessorTopology() {
     @Test
     public void shouldProcessRecordsForTopic() {
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
+        globalStateTask.update(record(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));

Review comment:
       That constructor of ConsumerRecord set the timestamp to `-1`, which is now prohibited because we construct a Record before processing, and Record enforces no negative timestamps.
   
   This seems fine to me, since it would only happen in unit tests (as ConsumerRecords returned from the broker never have negative timestamps).

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -792,8 +792,8 @@ public void init(final ProcessorContext<String, String> context) {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value, To.all().withTimestamp(context.timestamp() + 10));
+        public void process(final Record<String, String> record) {
+            context.forward(record.withTimestamp(record.timestamp() + 10));

Review comment:
       An example of setting only the timestamp.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -55,7 +56,7 @@ public void before() {
             collector,
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);

Review comment:
       Words cannot express how tired of this change I was by this point.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -466,10 +467,11 @@ public void init(final InternalProcessorContext context) {
                 this.context = context;
                 super.init(context);
             }
+
             @Override
-            public void process(final Integer key, final Integer value) {
-                if (key % 2 == 0) {
-                    context.forward(key, value);
+            public void process(final Record<Integer, Integer> record) {
+                if (record.key() % 2 == 0) {
+                    context.forward(record);

Review comment:
       Example of filtering but otherwise passing the record through.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
##########
@@ -81,14 +84,9 @@ public void shouldThrowNullPointerOnRegisterIfStateStoreIsNull() {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
+    public void shouldReturnNullTopicIfNoRecordContext() {

Review comment:
       These tests were enforcing a behavior that would never have actually happened in practice. Since I changed these methods to return the dummy values when the context is undefined, these tests also have to change.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -670,9 +671,26 @@ public boolean process(final long wallClockTime) {
 
             log.trace("Start processing one record [{}]", record);
 
-            updateProcessorContext(record, currNode, wallClockTime);
+            updateProcessorContext(
+                currNode,
+                wallClockTime,
+                new ProcessorRecordContext(
+                    record.timestamp,
+                    record.offset(),
+                    record.partition(),
+                    record.topic(),
+                    record.headers()
+                )
+            );
+
             maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
-            maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), time, processLatencySensor);
+            final Record<Object, Object> toProcess = new Record<>(
+                record.key(),
+                record.value(),
+                processorContext.timestamp(),
+                processorContext.headers()

Review comment:
       This is pulling out the timestamp and headers that we just set a few lines earlier.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -967,11 +968,11 @@ public void init(final ProcessorContext<String, String> context) {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record) {
                     final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
-                    kvStore.put(key, 5L);
+                    kvStore.put(record.key(), 5L);
 
-                    context.forward(key, "5");
+                    context.forward(record.withValue("5"));

Review comment:
       You're going to see a lot of these in the tests.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }

Review comment:
       We used to check this only in SinkNode, but it seems better to fail fast since we actually have the opportunity to do so now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org