You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/19 22:10:15 UTC

[GitHub] [kafka] mjsax opened a new pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

mjsax opened a new pull request #10731:
URL: https://github.com/apache/kafka/pull/10731


   Follow up to #10720


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r640004045



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,20 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * then the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}
+     * does not guarantee that all context information will be available when {@code transform()}
+     * is executed.

Review comment:
       Well, for `KTable#transformValues` those things don't apply because there is no out-of-band transformation upstream.
   
   It's the DSL, not the PAPI. It might might sense to add this information in general, but seems to be out-of-scope for this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r644507242



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -158,7 +158,11 @@ public void init(final ProcessorContext context) {
             internalProcessorContext.setRecordContext(new ProcessorRecordContext(
                 valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp(),
                 -1L, // we don't know the original offset
-                currentContext.partition(),
+                // technically, we know the partition, but in the new `api.Processor` class,
+                // we move to `RecordMetadata` than would be `null` for this case and thus
+                // we won't have the partition information, so it's better to now provide it
+                // here either, to not introduce a regression later on
+                -1,

Review comment:
       @vvcephei I think it's better to change this...

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,21 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * then the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}
+     * does not guarantee that all context information will be available when {@code transform()}
+     * is executed, as it might be executed "out-of-band" due to some internal optimizations
+     * applied by the Kafka Streams DSL.

Review comment:
       @vvcephei Updated the text a little bit.
   
   Don't think we should talk about punctuations because it does not apply for the "ValueGetter" optimization case that we fixed. Did a follow up PR instead: https://github.com/apache/kafka/pull/10810




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r639778036



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,20 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * then the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}
+     * does not guarantee that all context information will be available when {@code transform()}
+     * is executed.

Review comment:
       We might as well formally specify which things may be missing. Otherwise, it's not clear what users should do to guard their code.
   ```suggestion
        * does not guarantee that all record context information will be available when
        * {@code transform()} is executed. For example, {@code transform()} may be
        * invoked as a result of upstream punctuations or other out-of-band operations,
        * in which case there will be no headers, topic name, timestamp, or offset available.
        * When those properties are absent, the following placeholder values will
        * be filled in: empty headers, `null` topic name, `-1` timestamp, and `-1` offset.
        * Implementations of this interface should guard against this case.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10731:
URL: https://github.com/apache/kafka/pull/10731


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r645018450



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,21 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * then the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}
+     * does not guarantee that all context information will be available when {@code transform()}
+     * is executed, as it might be executed "out-of-band" due to some internal optimizations
+     * applied by the Kafka Streams DSL.

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r645869785



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -158,7 +158,11 @@ public void init(final ProcessorContext context) {
             internalProcessorContext.setRecordContext(new ProcessorRecordContext(
                 valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp(),
                 -1L, // we don't know the original offset
-                currentContext.partition(),
+                // technically, we know the partition, but in the new `api.Processor` class,
+                // we move to `RecordMetadata` than would be `null` for this case and thus
+                // we won't have the partition information, so it's better to now provide it
+                // here either, to not introduce a regression later on
+                -1,

Review comment:
       SGTM, thanks @mjsax to bring to our attention.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r645017943



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -158,7 +158,11 @@ public void init(final ProcessorContext context) {
             internalProcessorContext.setRecordContext(new ProcessorRecordContext(
                 valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp(),
                 -1L, // we don't know the original offset
-                currentContext.partition(),
+                // technically, we know the partition, but in the new `api.Processor` class,
+                // we move to `RecordMetadata` than would be `null` for this case and thus
+                // we won't have the partition information, so it's better to now provide it
+                // here either, to not introduce a regression later on
+                -1,

Review comment:
       Good point, @mjsax .

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,21 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * then the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}
+     * does not guarantee that all context information will be available when {@code transform()}
+     * is executed, as it might be executed "out-of-band" due to some internal optimizations
+     * applied by the Kafka Streams DSL.

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r636606646



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,20 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * that the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}

Review comment:
       is this a typo? 
   if a 'xxx' is used in a 'yyy' operation, **that** the provided 'zzz' from 'aaa' doesn't guarantee....
   
   Maybe replace `that` into `then`?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r645017943



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -158,7 +158,11 @@ public void init(final ProcessorContext context) {
             internalProcessorContext.setRecordContext(new ProcessorRecordContext(
                 valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp(),
                 -1L, // we don't know the original offset
-                currentContext.partition(),
+                // technically, we know the partition, but in the new `api.Processor` class,
+                // we move to `RecordMetadata` than would be `null` for this case and thus
+                // we won't have the partition information, so it's better to now provide it
+                // here either, to not introduce a regression later on
+                -1,

Review comment:
       Good point, @mjsax .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r644196594



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,20 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * then the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}
+     * does not guarantee that all context information will be available when {@code transform()}
+     * is executed.

Review comment:
       All I was trying to do was characterize why the context information might be missing. If there are _no_ out-of-band operations, then there should never be missing context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r644508311



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##########
@@ -75,14 +75,21 @@
     void init(final ProcessorContext context);
 
     /**
-     * Transform the given [key and ]value to a new value.
+     * Transform the given [key and] value to a new value.
      * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
-     * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+     * Note that using {@link ProcessorContext#forward(Object, Object)} or
      * {@link ProcessorContext#forward(Object, Object, To)} is not allowed within {@code transform} and
      * will result in an {@link StreamsException exception}.
+     * <p>
+     * Note that if a {@code ValueTransformerWithKey} is used in a {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (or any other overload of {@code KTable#transformValues(...)}) operation,
+     * then the provided {@link ProcessorContext} from {@link #init(ProcessorContext)}
+     * does not guarantee that all context information will be available when {@code transform()}
+     * is executed, as it might be executed "out-of-band" due to some internal optimizations
+     * applied by the Kafka Streams DSL.

Review comment:
       @vvcephei Updated the text a little bit.
   
   Don't think we should talk about punctuations because it does not apply for the "ValueGetter" optimization case that we fixed. Did a follow up PR instead: https://github.com/apache/kafka/pull/10810




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r644507242



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
##########
@@ -158,7 +158,11 @@ public void init(final ProcessorContext context) {
             internalProcessorContext.setRecordContext(new ProcessorRecordContext(
                 valueAndTimestamp == null ? UNKNOWN : valueAndTimestamp.timestamp(),
                 -1L, // we don't know the original offset
-                currentContext.partition(),
+                // technically, we know the partition, but in the new `api.Processor` class,
+                // we move to `RecordMetadata` than would be `null` for this case and thus
+                // we won't have the partition information, so it's better to now provide it
+                // here either, to not introduce a regression later on
+                -1,

Review comment:
       @vvcephei I think it's better to change this...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org