You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/01 17:08:52 UTC

[GitHub] [kafka] vvcephei opened a new pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

vvcephei opened a new pull request #9361:
URL: https://github.com/apache/kafka/pull/9361


   Migrate different components of the old ProcessorContext interface
   into separate interfaces that are more appropriate for their usages.
   See KIP-478 for the details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498951242



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       Thanks for your thoughts, and I agree it is too stretch for KIP-478 to extend to that. Let's just keep it as is now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498560275



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.

Review comment:
       nit: see my other comment above, maybe we can leave some guidance on when it is preferable to reuse the Record with mutable fields than creating a new Record object?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       I was hoping that with the new strong typing API, during forwarding we do not need to cast to `(ProcessorNode<K, V, ?, ?>)` and not need this suppression any more.. Could we add the typing into `currentNode` (e.g. validate that the currentNode.children is indeed in `K, V`) instead of force casting?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }
+        this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);

Review comment:
       That looks reasonable to me. More generally I'd suggest to document either in this class or in the `forward` class what side-effects the user need to consider if they decided to reuse the object passed in as parameters and mutate its fields / forward to downstream. And as long as we do that I feel this would be okay.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);

Review comment:
       I'd suggest we add a few more sentences about "always create a new Record upon forwarding" v.s. "reuse the object overriding its key/value/timestamp/header fields", e.g. which way is more plausible when.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -135,16 +132,17 @@ public void logChange(final String storeName,
      * @throws StreamsException if an attempt is made to access this state store from an unknown node
      * @throws UnsupportedOperationException if the current streamTask type is standby
      */
+    @SuppressWarnings("unchecked")

Review comment:
       Why we'd need to add this suppression?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498606066



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       I'll have to think about this, but offhand, it doesn't seem like that would work.
   
   In general, the type safety we're providing is on the user side of the PAPI (of course, the DSL internals is a user of the PAPI). On the internals side of the PAPI implementation, we generally can't get any more safety. For example, the ProcessorContextImpl is instantiated for the whole Task, which would handle a number of different record types and ProcessorNode types, so the internal implementation of the ProcessorContextImpl would have to be agnostic wrt the actual types of the nodes.
   
   I think that by the time I'm done with the full implementation of this KIP, you'll see most of the unchecked suppressions and casts gone from the DSL implementation, but there will still be some in classes like this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498905668



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -55,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is

Review comment:
       I've just updated the PR with this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498591825



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);

Review comment:
       Thanks, this sounds like a good place to put that guidance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#issuecomment-703003770


   One unrelated failure:
   ```
   Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] failed
   ```
   
   I'll go ahead and merge, since Guozhang was in favor.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498591964



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }
+        this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);

Review comment:
       Thanks. I like the idea of putting it in forward. Maybe with a short mention here as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498592498



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -52,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is
+     * receiving a record downstream of a Source (i.e., the current record is coming from an

Review comment:
       Sure!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498603554



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9361:
URL: https://github.com/apache/kafka/pull/9361


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498871305



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -692,8 +690,7 @@ public boolean process(final long wallClockTime) {
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext());
-            maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor);
+            maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);

Review comment:
       Yeah, I agree. I think this is much simpler for both the implementation and the API. Sorry I didn't see it at first ;) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498915601



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       I think it would have been nice to get some kind of public-facing API into `Topology` like what I quoted above, but at this point, I think we can just cut scope on KIP-478 and implement it for 2.7 as-is. Then, we can follow up with a new KIP to add some kind of `Topology#attachChildToParent` API in a later KIP).
   
   Aside from cutting scope, I wanted to do it as an internal API while migrating the DSL internals so I could be sure the "attach" method is really the best approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498531149



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -692,8 +690,7 @@ public boolean process(final long wallClockTime) {
                 processorContext.timestamp(),
                 processorContext.headers()
             );
-            final Optional<RecordMetadata> recordMetadata = Optional.ofNullable(processorContext.recordContext());
-            maybeMeasureLatency(() -> currNode.process(toProcess, recordMetadata), time, processLatencySensor);
+            maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);

Review comment:
       I think this is a proof of code simplicity that recordContext is unnecessarily passing around here :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -52,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is
+     * receiving a record downstream of a Source (i.e., the current record is coming from an

Review comment:
       nit: maybe also state that the metadata would be the one from the source record?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498608642



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       What I'm wondering is that, within a processor node of `<KIn, VIn, KOut, VOut>`, if a `context.forward()` is called with `Record<KOut2, VOut2>`, would we only throw a casting exception at runtime, or we could capture this at compiler time. From this suppression it seems we still cannot achieve the former.. do you think this can be improved alongside with this KIP too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498914034



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -159,84 +157,123 @@ public StateStore getStateStore(final String name) {
         }
 
         final StateStore store = stateManager.getStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @Override
     public <K, V> void forward(final K key,
                                final V value) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, SEND_TO_ALL);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final int childIndex) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(
+        final Record<K, V> toForward = new Record<>(
             key,
             value,
-            To.child((currentNode().children()).get(childIndex).name()));
+            timestamp(),
+            headers()
+        );
+        forward(toForward, (currentNode().children()).get(childIndex).name());
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key,
                                final V value,
                                final String childName) {
-        throwUnsupportedOperationExceptionIfStandby("forward");
-        forward(key, value, To.child(childName));
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            timestamp(),
+            headers()
+        );
+        forward(toForward, childName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
+        final ToInternal toInternal = new ToInternal(to);
+        final Record<K, V> toForward = new Record<>(
+            key,
+            value,
+            toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
+            headers()
+        );
+        forward(toForward, toInternal.child());
+    }
+
+    @Override
+    public <K, V> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @SuppressWarnings("unchecked")

Review comment:
       I see.
   
   So, _within_ the `Processor<KIn, Vin, KOut, VOut>`, the `ProcessorContext` is bounded to `<KOut, VOut>`, and therefore `forward` will only accept a `Record<KOut, VOut>`. The compiler will check the source code of the processor and enforce this at compile time. It sounds like this is one part of what you are thinking about.
   
   But inside the `ProcessorContextImpl`, we're _outside_ of the `Processor`, and in the "plumbing" of Streams. In this context, we cannot have compile-time type safety, since we can't bound all of KafkaStreams to accept only one kind of record and processor.
   
   However, what we can check at compile time is that we only attach _compatible_ children to parents. This isn't a way to do this in `Topology` right now, and I didn't want to expand this KIP to that extent. My plan for getting the full benefit of this KIP in the DSL internals is to actually add an internal utility method for registering pairs of processors, like this:
   ```java
       void addGraphNode(final StreamsGraphNode<KIn, VIn, KIntermediate, VIntermediate> parent,
                         final StreamsGraphNode<KIntermediate, VIntermediate, KOut, VOut> child);
   ```
   Then, in addition to a compile-time guarantee that a processor can only forward its declared output type, we also get a guarantee that the builder can only attach compatible parent-child pairs. Assuming you don't do any casting in "user space",
   we don't need any further compiler checking to render cast exceptions impossible. The "plumbing code" like in this `ProcessorContextImpl` is akin to the JVM runtime after type erasure... the "compiler" has already done its job on the "source code" (the user-facing side of the PAPI), so we don't need the types anymore at "run time".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498400505



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -49,7 +51,28 @@
      * Initializes this state store.
      * <p>
      * The implementation of this function must register the root store in the context via the
-     * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the
+     * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(StateStoreContext, StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);

Review comment:
       Not sure what's up with this diff, but this is the old API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -88,9 +88,12 @@ void register(final StateStore store,
      * Get the state store given the store name.
      *
      * @param name The store name
+     * @param <S> The type or interface of the store to return
      * @return The state store instance
+     *
+     * @throws ClassCastException if the return type isn't a type or interface of the actual returned store.
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);

Review comment:
       now that we can implement both the new and old contexts with the same Impl, we need this to resolve a clash. It's backward compatible and a nice quality-of-life improvement anyway.

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -160,24 +161,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         @Override
         public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
-            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
-            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
+            context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+            context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::flushStore);
             store = context.getStateStore("aggStore");
         }
 
         @Override
-        public void process(final String key, final Long value) {
-            final Long oldValue = store.get(key);
-            if (oldValue == null || value > oldValue) {
-                store.put(key, value);
+        public void process(final Record<String, Long> record) {
+            final Long oldValue = store.get(record.key());
+            if (oldValue == null || record.value() > oldValue) {
+                store.put(record.key(), record.value());
             }
         }
 
-        private void flushStore() {
+        private void flushStore(final long timestamp) {
             final KeyValueIterator<String, Long> it = store.all();
             while (it.hasNext()) {
                 final KeyValue<String, Long> next = it.next();
-                context.forward(next.key, next.value);
+                context.forward(new Record<>(next.key, next.value, timestamp));

Review comment:
       Since we have to define a timestamp now, I'm showing the use of the punctuation time in the dev guide.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -83,30 +90,21 @@
      */
     StreamsMetrics metrics();
 
-    /**
-     * Registers and possibly restores the specified storage engine.
-     *
-     * @param store the storage engine
-     * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart
-     *
-     * @throws IllegalStateException If store gets registered after initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
-    void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
-

Review comment:
       Moved to the StateStoreContext.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -61,7 +84,9 @@
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final StateStoreContext context, final StateStore root) {
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }

Review comment:
       This is the new API. Note the default implementation that delegates to the old API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);

Review comment:
       Migrated to the new Record argument.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -29,7 +30,7 @@
 import static java.util.Objects.requireNonNull;
 import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
 
-public class ProcessorRecordContext implements RecordContext {
+public class ProcessorRecordContext implements RecordContext, RecordMetadata {

Review comment:
       Here's the implementation of RecordMetadata.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##########
@@ -49,26 +50,38 @@ protected StateManager stateManager() {
         return stateManager;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         final StateStore store = stateManager.getGlobalStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+    public <K, V> void forward(final Record<K, V> record) {
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         try {
             for (final ProcessorNode<?, ?, ?, ?> child : currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
+                ((ProcessorNode<K, V, ?, ?>) child).process(record);
             }
         } finally {
             setCurrentNode(previousNode);
         }
     }
 
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String childName) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in global processor context.");

Review comment:
       Just implementing the new APIs while preserving the existing patterns.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1230,10 +1232,10 @@ public void shouldNotShareHeadersBetweenPunctuateIterations() {
         task.completeRestoration();
 
         task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            task.processorContext().recordContext().headers().add("dummy", (byte[]) null);
+            task.processorContext().headers().add("dummy", (byte[]) null);

Review comment:
       This test was actually testing a slightly wrong thing: `recordContext` was never exposed to users, they would have accessed the headers as `processorContext.header()`. It's important here because I've refactored the internal code to set `recordContext` to `null` when there is no defined context (such as in a punctuation like here).

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
##########
@@ -57,11 +59,11 @@ public void setUp() {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);

Review comment:
       Just a quick note. We do still expect the inner store to have the old init method invoked because none of the wrapper stores are implementing the new init method, so they're using the default implementation that delegates to the old init method. I'm going to take care of that in a follow-on PR.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##########
@@ -65,25 +66,19 @@ public void init(final ProcessorContext<KOut, VOut> context) {
             scheduleCancellable = context.schedule(
                 Duration.ofMillis(scheduleInterval),
                 punctuationType,
-                timestamp -> {
-                    if (punctuationType == PunctuationType.STREAM_TIME) {
-                        assertThat(context.timestamp(), is(timestamp));
-                    }
-                    assertThat(context.partition(), is(-1));
-                    assertThat(context.offset(), is(-1L));
-
-                    (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
-                        .add(timestamp);
-                });
+                (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)::add

Review comment:
       This can become a method reference now because those assertions on partition and offset are meaningless now.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -148,16 +149,16 @@ public TopologyTestDriverTest(final boolean eosEnabled) {
         }
     }
 
-    private final static class Record {
+    private final static class TTDTestRecord {

Review comment:
       This was a bit funny to run into by this point in the implementation. It turns out we already had a class called "Record", and now we need to reference both of them in this test. I felt like it was more readable to just give this class a new name instead of referencing it by fully qualified name.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
##########
@@ -46,12 +46,11 @@
     default void init(final ProcessorContext<KOut, VOut> context) {}
 
     /**
-     * Process the record with the given key and value.
+     * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
      *
-     * @param key the key for the record
-     * @param value the value for the record
+     * @param record the record to process
      */
-    void process(KIn key, VIn value);
+    void process(Record<KIn, VIn> record);

Review comment:
       The new Processor API (with Record) proposed in the KIP.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {

Review comment:
       The new state store context proposed in the KIP.

##########
File path: checkstyle/suppressions.xml
##########
@@ -185,7 +185,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="StreamThreadTest.java"/>
+              files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>

Review comment:
       These tests now have a couple more imports, which pushed them over the line. It's possible that they'll drop below the line again after completing the transition to KIP-478. Otherwise, we should refactor these tests to comply with the limit. But we should do that separately.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -55,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is
+     * receiving a record downstream of a Source (i.e., the current record is coming from an
+     * input topic), the metadata is defined. On the other hand, if a parent processor has
+     * registered a punctuator and called {@link ProcessorContext#forward(Record)} from that
+     * punctuator, then there is no record from an input topic, and therefore the metadata
+     * would be undefined.
+     */
+    Optional<RecordMetadata> recordMetadata();

Review comment:
       The new RecordMetadata context proposed in the KIP. Hopefully, the Javadoc is clear on why it's Optional.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }
+        this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);

Review comment:
       Note: each time we create a new Record, we copy the headers. This is an improvement over the current situation where there's no mutability barriers across the whole subtopology, so changes to headers in one processor can have unexpected effects on other processors that are very far away in the dependency diagram.
   
   However, it doesn't completely solve the problem: changes in children can still be visible to parents and siblings. @mjsax and I discussed an alternative option of providing a completely immutable implementation (copy on write) of Headers as a complete solution. But it also seems to be a pretty severe performance penalty. Instead, perhaps we can just document a safe pattern. E.g.,
   ```
   record = new Record(...)
   context.forward(record, "childA")
   record.headers().add(new header)
   // or
   record.withHeaders(record.headers().add(new header))
   context.forward(record, "childB")
   ```
   is unsafe because childA may modify the headers, affecting both the parent and childB. Instead, you should do something like:
   
   ```
   record1 = new Record(...)
   record2 = new Record(...)
   record2.headers().add(new header)
   context.forward(record1, "childA")
   context.forward(record2, "childB")
   ```
   Now, the headers for both children are completely independent objects.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
 
     /**
      * Requests a commit.
      */
     void commit();
 
-    /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the offset
-     */
-    long offset();
-
-    /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the headers
-     */
-    Headers headers();
-
-    /**
-     * Returns the current timestamp.
-     *
-     * <p> If it is triggered while processing a record streamed from the source processor,
-     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
-     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
-     *
-     * <p> If it is triggered while processing a record generated not from the source processor (for example,
-     * if this method is invoked from the punctuate call), timestamp is defined as the current
-     * task's stream time, which is defined as the largest timestamp of any record processed by the task.
-     *
-     * @return the timestamp
-     */
-    long timestamp();

Review comment:
       These are moved to Record.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -147,7 +148,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+            stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       These are scattered throughout this PR. It's just selecting the init method we want to invoke. It's only necessary because the `globalProcessorContext` here actually implements both `ProcessorContext` and `StateStoreContext`. This is only true of our internal contexts, so users will not face a similar need to change code.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextReverseAdapter implements InternalProcessorContext {

Review comment:
       Don't need this adapter anymore, either.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);
 
     /**
      * Requests a commit.
      */
     void commit();
 
-    /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
-     *
-     * @return the offset
-     */
-    long offset();

Review comment:
       These are moved to RecordMetadata

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -114,7 +117,7 @@ public void init(final InternalProcessorContext context) {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init(ProcessorContextAdapter.adapt(context));
+                        processor.init((ProcessorContext<KOut, VOut>) context);

Review comment:
       Casting to add the generic params (the InternalProcessorContext is parameterized as `<Object, Object>`).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -104,7 +105,13 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
                     deserialized.headers());
             processorContext.setRecordContext(recordContext);
             processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
-            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), deserialized.value());
+            final Record<Object, Object> toProcess = new Record<>(
+                deserialized.key(),
+                deserialized.value(),
+                processorContext.timestamp(),
+                processorContext.headers()
+            );
+            ((SourceNode<Object, Object, Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);

Review comment:
       This is a pretty common pattern where we need to bridge the new and old APIs. We construct the "record" by filling in the timestamp and headers from the context.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -866,9 +867,9 @@ public void init(final ProcessorContext<String, String> context) {
                     }
 
                     @Override
-                    public void process(final String key, final String value) {
-                        if (value.length() % 2 == 0) {
-                            context.forward(key, key + value);
+                    public void process(final Record<String, String> record) {
+                        if (record.value().length() % 2 == 0) {
+                            context.forward(record.withValue(record.key() + record.value()));

Review comment:
       A good example of updating just the value for the child.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
##########
@@ -33,7 +34,9 @@
  * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext extends ProcessorContext {
+public interface InternalProcessorContext
+    extends ProcessorContext, org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, StateStoreContext {

Review comment:
       Thanks to this, all our internal Impls are suitable to pass in to any of the new APIs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
##########
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.RecordContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-/**
- * For internal use so we can update the {@link RecordContext} and current
- * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
- * {@link ThreadCache}
- */
-public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {

Review comment:
       Don't need this anymore, since the InternalProcessorContext can now implement all of the new and old Contexts.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -742,8 +760,7 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
             throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
         }
 
-        updateProcessorContext(new StampedRecord(new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null),
-            timestamp), node, time.milliseconds());
+        updateProcessorContext(node, time.milliseconds(), null);

Review comment:
       Instead of setting a dummy context, we're now just setting the context to `null` aka "undefined".

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {

Review comment:
       The new Record class proposed in the KIP.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class StoreToProcessorContextAdapter implements ProcessorContext {
+    private final StateStoreContext delegate;
+
+    public static ProcessorContext adapt(final StateStoreContext delegate) {
+        if (delegate instanceof ProcessorContext) {
+            return (ProcessorContext) delegate;
+        } else {
+            return new StoreToProcessorContextAdapter(delegate);
+        }
+    }

Review comment:
       This allows us to transparently delegate the new API to the old one for StateStore implementations. Our internal StateStoreContext implementations all implement both APIs, so they just get casted, while if you use a separate implementation of StateStoreContext (e.g., in unit tests), it'll get adapted, which works just as long as the underlying store implementation doesn't try to call forward or anything.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
##########
@@ -86,7 +86,7 @@ public void register(final StateStore store,
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {

Review comment:
       Just implementing the interface.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -112,64 +114,69 @@ public void register(final StateStore store,
         stateManager().registerStore(store, stateRestoreCallback);
     }
 
-    /**
-     * @throws IllegalStateException if the task's record is null
-     */
     @Override
     public String topic() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
-        }
-
-        final String topic = recordContext.topic();
-
-        if (NONEXIST_TOPIC.equals(topic)) {
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For topic, the dummy value is `null`.

Review comment:
       The prior code was a bit misleading. I could not find _any_ code path where the context was actually null before, since we always initialized it with a "dummy context". This change simplifies the codebase by just moving the dummy values here and we now really do set the record context to `null` to (internally) signify when it is undefined.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
##########
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextAdapter<KForward, VForward>
-    implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {

Review comment:
       We also don't need these adapters anymore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
##########
@@ -23,6 +23,10 @@ public ToInternal() {
         super(To.all());
     }
 
+    public ToInternal(final To to) {
+        super(to);
+    }

Review comment:
       A copy constructor helped with the ProcessorContextImpl refactoring.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
##########
@@ -60,7 +61,7 @@ public void init(final ProcessorContext context) {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
-                        timestamp -> context.forward(-1, (int) timestamp)
+                        timestamp -> context.forward(-1, (int) timestamp, To.all().withTimestamp(timestamp))

Review comment:
       The prior code here was actually relying on a strange effect in which we set the (undefined) processor context's timestamp to the punctuation time. I could preserve that behavior, but it looked like a bug to me.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+public final class ConsumerRecordUtil {
+    private ConsumerRecordUtil() {}
+
+    public static <K, V> ConsumerRecord<K, V> record(final String topic,
+                                                     final int partition,
+                                                     final long offset,
+                                                     final K key,
+                                                     final V value) {
+        // the no-time constructor in ConsumerRecord initializes the
+        // timestamp to -1, which is an invalid configuration. Here,
+        // we initialize it to 0.
+        return new ConsumerRecord<>(
+            topic,
+            partition,
+            offset,
+            0L,
+            TimestampType.CREATE_TIME,
+            0L,
+            0,
+            0,
+            key,
+            value
+        );
+    }

Review comment:
       This is the utility method I replaced the ConsumerRecord constructor with earlier in the PR.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -775,8 +775,8 @@ public void init(final ProcessorContext<String, String> context) {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value);
+        public void process(final Record<String, String> record) {
+            context.forward(record);

Review comment:
       just a simple passthrough

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+public interface RecordMetadata {

Review comment:
       The new metadata proposed in the KIP. Note that it's an interface because in reality, it's just going to be a view onto the ProcessorRecordContext.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
##########
@@ -127,7 +128,7 @@ public void shouldInitializeProcessorTopology() {
     @Test
     public void shouldProcessRecordsForTopic() {
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));
+        globalStateTask.update(record(topic1, 1, 1, "foo".getBytes(), "bar".getBytes()));

Review comment:
       That constructor of ConsumerRecord set the timestamp to `-1`, which is now prohibited because we construct a Record before processing, and Record enforces no negative timestamps.
   
   This seems fine to me, since it would only happen in unit tests (as ConsumerRecords returned from the broker never have negative timestamps).

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -792,8 +792,8 @@ public void init(final ProcessorContext<String, String> context) {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value, To.all().withTimestamp(context.timestamp() + 10));
+        public void process(final Record<String, String> record) {
+            context.forward(record.withTimestamp(record.timestamp() + 10));

Review comment:
       An example of setting only the timestamp.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -55,7 +56,7 @@ public void before() {
             collector,
             new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);

Review comment:
       Words cannot express how tired of this change I was by this point.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -466,10 +467,11 @@ public void init(final InternalProcessorContext context) {
                 this.context = context;
                 super.init(context);
             }
+
             @Override
-            public void process(final Integer key, final Integer value) {
-                if (key % 2 == 0) {
-                    context.forward(key, value);
+            public void process(final Record<Integer, Integer> record) {
+                if (record.key() % 2 == 0) {
+                    context.forward(record);

Review comment:
       Example of filtering but otherwise passing the record through.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
##########
@@ -81,14 +84,9 @@ public void shouldThrowNullPointerOnRegisterIfStateStoreIsNull() {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
+    public void shouldReturnNullTopicIfNoRecordContext() {

Review comment:
       These tests were enforcing a behavior that would never have actually happened in practice. Since I changed these methods to return the dummy values when the context is undefined, these tests also have to change.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -670,9 +671,26 @@ public boolean process(final long wallClockTime) {
 
             log.trace("Start processing one record [{}]", record);
 
-            updateProcessorContext(record, currNode, wallClockTime);
+            updateProcessorContext(
+                currNode,
+                wallClockTime,
+                new ProcessorRecordContext(
+                    record.timestamp,
+                    record.offset(),
+                    record.partition(),
+                    record.topic(),
+                    record.headers()
+                )
+            );
+
             maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
-            maybeMeasureLatency(() -> currNode.process(record.key(), record.value()), time, processLatencySensor);
+            final Record<Object, Object> toProcess = new Record<>(
+                record.key(),
+                record.value(),
+                processorContext.timestamp(),
+                processorContext.headers()

Review comment:
       This is pulling out the timestamp and headers that we just set a few lines earlier.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -967,11 +968,11 @@ public void init(final ProcessorContext<String, String> context) {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record) {
                     final KeyValueStore<String, Long> kvStore = context.getStateStore(storeName);
-                    kvStore.put(key, 5L);
+                    kvStore.put(record.key(), 5L);
 
-                    context.forward(key, "5");
+                    context.forward(record.withValue("5"));

Review comment:
       You're going to see a lot of these in the tests.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link Processor}
+ * or a record to forward to downstream processors via {@link ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, {@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
+            );
+        }

Review comment:
       We used to check this only in SinkNode, but it seems better to fail fast since we actually have the opportunity to do so now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] pgwhalen commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
pgwhalen commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498610219



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -55,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is

Review comment:
       (I'll offer an opinion here because I stressed the importance of it on the mailing list, though I do think this is already pretty good.  Feel free to take it or leave it, it's just a rewording that may or may not add clarity)
   
   > Note that as long as the processor is receiving a record downstream of a Source
   
   Isn't every record received downstream of a source one way or another?  IMO the up front emphasis should be on _how_ the record came from the upstream processor.  My attempt:
   
   > The record metadata is defined if the record currently being processed was passed through parent processor(s) directly from a Source.  It is undefined if the record was forwarded from within a Punctuator.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498904526



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -55,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the processor is

Review comment:
       Thanks, @pgwhalen !
   
   I agree, the docs I wrote were a little too terse. How about this:
   
   ```java
       /**
        * The metadata of the source record, if is one. Processors may be invoked to
        * process a source record from an input topic, to run a scheduled punctuation
        * (see {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)} ),
        * or because a parent processor called {@link ProcessorContext#forward(Record)}.
        * <p>
        * In the case of a punctuation, there is no source record, so this metadata would be
        * undefined. Note that when a punctuator invokes {@link ProcessorContext#forward(Record)},
        * downstream processors will receive the forwarded record as a regular
        * {@link Processor#process(Record)} invocation. In other words, it wouldn't be apparent to
        * downstream processors whether or not the record being processed came from an input topic
        * or punctuation and therefore whether or not this metadata is defined. This is why
        * the return type of this method is {@link Optional}.
        * <p>
        * If there is any possibility of punctuators upstream, any access
        * to this field should consider the case of
        * "{@code recordMetadata().isPresent() == false}".
        * Of course, it would be safest to always guard this condition.
        */
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498604821



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -135,16 +132,17 @@ public void logChange(final String storeName,
      * @throws StreamsException if an attempt is made to access this state store from an unknown node
      * @throws UnsupportedOperationException if the current streamTask type is standby
      */
+    @SuppressWarnings("unchecked")

Review comment:
       It's because we're returning a subtype of StateStore now, but the stores are still internally not typed, so someone still has to cast it.
   
   For example, here's what the old API was like:
   ```java
   KeyValueStore<String, Integer> store = (KeyValueStore<String, Integer>) context.getStateStore("asdf");
   ```
   The cast is in the user's code.
   
   Now, it will be like this:
   ```java
   KeyValueStore<String, Integer> store = context.getStateStore("asdf");
   ```
   The return type is resolved by the type system, and the cast moves into our code.
   
   Just to clarify, there's no extra safety here, it's a cast either way. It's just more convenient for callers not to have to cast on their end.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9361: KAFKA-10535: Split ProcessorContext into Processor/StateStore/Record Contexts

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498605578



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -135,16 +132,17 @@ public void logChange(final String storeName,
      * @throws StreamsException if an attempt is made to access this state store from an unknown node
      * @throws UnsupportedOperationException if the current streamTask type is standby
      */
+    @SuppressWarnings("unchecked")

Review comment:
       Got it, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org