You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/19 13:25:20 UTC

[GitHub] [kafka] vcrfxia commented on a change in pull request #10720: MINOR: preserve timestamp when getting value from upstream state store

vcrfxia commented on a change in pull request #10720:
URL: https://github.com/apache/kafka/pull/10720#discussion_r635231680



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -141,16 +144,32 @@ public void close() {
 
         @Override
         public void init(final ProcessorContext context) {
+            internalProcessorContext = (InternalProcessorContext) context;
             parentGetter.init(context);
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
         }
 
         @Override
         public ValueAndTimestamp<V1> get(final K key) {
             final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
-            return ValueAndTimestamp.make(
+

Review comment:
       To confirm my understanding, the reason only `KTableTransformValuesGetter` and not any of the other `KTableValueGetter` implementations are impacted by this bug is because only `KTableTransformValuesGetter` gives users access to the `ProcessorContext`. Is that true?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -141,16 +144,32 @@ public void close() {
 
         @Override
         public void init(final ProcessorContext context) {
+            internalProcessorContext = (InternalProcessorContext) context;
             parentGetter.init(context);
             valueTransformer.init(new ForwardingDisabledProcessorContext(context));
         }
 
         @Override
         public ValueAndTimestamp<V1> get(final K key) {
             final ValueAndTimestamp<V> valueAndTimestamp = parentGetter.get(key);
-            return ValueAndTimestamp.make(
+
+            final ProcessorRecordContext currentContext = internalProcessorContext.recordContext();
+
+            internalProcessorContext.setRecordContext(new ProcessorRecordContext(

Review comment:
       Are there ever situations where users would want the old behavior (to have access to the `ProcessorContext` for the record that triggered the lookup, rather than the context for the record that's being looked up)? For example, if the topic name is relevant for the transformer and all records (including the current one that triggered the lookup and the one being processed) are from the same topic, then the old behavior gives access to the topic name but this new behavior doesn't.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org