You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/03 06:04:09 UTC

[GitHub] [kafka] mjsax opened a new pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

mjsax opened a new pull request #10810:
URL: https://github.com/apache/kafka/pull/10810


   Follow up to #10731
   
   I am not totally happy with this PR yet, and we might need to descope or split it up. While working on the PR, I figured that headers should actually never be `null` but I am not sure if we can easily change it. Especially for the public test helpers...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r645024604



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -319,7 +321,7 @@ public void setRecordMetadata(final String topic,
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       If you want to introduce the invariant without introducing an NPE regression, you could just coerce a null:
   
   ```suggestion
           this.headers = headers == null ? new RecordHeaders() : headers;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647670096



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -53,9 +53,9 @@
     TaskId taskId();
 
     /**
-     * The metadata of the source record, if is one. Processors may be invoked to
+     * Return the metadata of the current record if available. Processors may be invoked to

Review comment:
       ```suggestion
        * Returns the metadata of the current record if available. Processors may be invoked to
   ```
   An `s` is missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647126680



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -319,7 +321,7 @@ public void setRecordMetadata(final String topic,
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       Thinking about is more, it might be best to keep the code as is, and allow `null`.
     - adding the new check might break existing test code
     - converting `null` to empty header might also break tests
     - users don't have a good way to pass in headers, as `Headers` is an interface (and there is no other public API they could use to create an object)
     - it was not a problem so far, and long term we want to deprecate this "old" API anyway (so we might want to avoid a case of pre-mature optimization)
   
   Of course, there is still this small gap between actual runtime code and the mock -- in the runtime, we don't ever return `null` and thus if the mock might return `null`, users cannot test their production code without an actually unnecessary null-check... but maybe that is acceptable. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647120250



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not

Review comment:
       All good then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r645023265



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the headers of the current input record.

Review comment:
       I see what you mean. I think it's worth providing a little more "context."
   
   Even in the punctuation case, it is obvious when you're specifically thinking about the relationship between upstream punctuation and downstream processors' contexts, but if you have a large Streams topology, maintained by a large team, you can easily have a situation where one person adds a punctuation that interacts poorly with logic far downstream, primarily maintained by other people.
   
   In that case, the downstream folks might be encountering a situation where they can't figure out why the headers are empty sometimes. In those cases, a little bread-crumb that says "this might be empty if..." can save someone hours or days of debugging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r644509797



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the headers of the current input record.

Review comment:
       While I think `headers` should never be `null`, they could still be empty. Not sure to what extend this must be documented? For punctuation "forwards" it seems obvious (similar if one would not set a key or value...), and for the new `Record` api it's even more obvious. For the `KTable#transformValues` "ValueGetter" case, it's a little different though. Might be worth to repeat here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -43,12 +43,11 @@ public ProcessorRecordContext(final long timestamp,
                                   final int partition,
                                   final String topic,
                                   final Headers headers) {
-
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       I think, for the internal change, this is actually ok. (I am still ok to split this into a separate PR, and keep this on a JavaDocs only PR).

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
##########
@@ -43,16 +43,12 @@
 
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private Metrics innerMetrics;
-    private StreamsMetricsImpl metrics;
-    private final String taskIDString = "0.0";
-    private final String underlyingStoreName = "storeName";

Review comment:
       Just some side cleanup in this test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
##########
@@ -43,16 +43,12 @@
 
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private Metrics innerMetrics;
-    private StreamsMetricsImpl metrics;
-    private final String taskIDString = "0.0";
-    private final String underlyingStoreName = "storeName";

Review comment:
       Just some additional side cleanup in this test.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -319,7 +321,7 @@ public void setRecordMetadata(final String topic,
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       This change is a little tricky... But if we consider it a bug-fix, it might still be ok?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r644510969



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
##########
@@ -43,16 +43,12 @@
 
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private Metrics innerMetrics;
-    private StreamsMetricsImpl metrics;
-    private final String taskIDString = "0.0";
-    private final String underlyingStoreName = "storeName";

Review comment:
       Just some side cleanup in this test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647115686



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not

Review comment:
       Well, yes and no. This PR is only documenting the status quo of the current (but "old") API.
   
   There is https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API that introduces a new API, including:
   ```
   interface ProcessorContext<K, V> { // this is a new class with same name but located in different/new package
     Optional<RecordMetadata> recordMetadata();
   }
   
   The KIP is not fully implemented yet, and thus the current API is not yet deprecated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647166328



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
##########
@@ -17,39 +17,94 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 
 /**
  * The context associated with the current record being processed by
  * an {@link Processor}
  */
 public interface RecordContext {
+
     /**
-     * @return  The offset of the original record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the topic name
      */
-    long offset();
+    String topic();
 
     /**
-     * @return  The timestamp extracted from the record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the partition id
      */
-    long timestamp();
+    int partition();
 
     /**
-     * @return  The topic the record was received on;
-     *          could be null if it is not available
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the offset
      */
-    String topic();
+    long offset();
 
     /**
-     * @return  The partition the record was received on;
-     *          could be -1 if it is not available
+     * Returns the current timestamp.

Review comment:
       Should we mention the timestamp could be -1 if it's not available as [previous doc](https://github.com/apache/kafka/pull/10810/files#diff-e49dc368634ce1745441b926e5327a51f5e168d6deffc8b7acc5c3483a1431f5L33):
   
   > @return  The timestamp extracted from the record received from Kafka; could be -1 if it is not available

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
##########
@@ -28,6 +29,16 @@
     /**
      * Perform the scheduled periodic operation.
      *
+     * <p> If this method accesses {@link ProcessorContext} or
+     * {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic,
+     * partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't
+     * be available.
+     *
+     * <p> Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)}
+     * or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
+     * won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is use,

Review comment:
       typo: If {@link ProcessorContext#forward(Object, Object)} is **use** -> **used**

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
##########
@@ -17,39 +17,94 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 
 /**
  * The context associated with the current record being processed by
  * an {@link Processor}
  */
 public interface RecordContext {
+
     /**
-     * @return  The offset of the original record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the topic name
      */
-    long offset();
+    String topic();
 
     /**
-     * @return  The timestamp extracted from the record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the partition id
      */
-    long timestamp();
+    int partition();
 
     /**
-     * @return  The topic the record was received on;
-     *          could be null if it is not available
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the offset
      */
-    String topic();
+    long offset();
 
     /**
-     * @return  The partition the record was received on;
-     *          could be -1 if it is not available
+     * Returns the current timestamp.
+     *
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.

Review comment:
       Is the 2 ConsumerRecord `{@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}` expected?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647662697



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##########
@@ -16,19 +16,51 @@
  */
 package org.apache.kafka.streams.processor.api;
 
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
 public interface RecordMetadata {
     /**
-     * @return  The topic of the original record received from Kafka
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.

Review comment:
       Cool. Works for me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r644509797



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the headers of the current input record.

Review comment:
       While I think `headers` should never be `null`, they could still be empty. Not sure to what extend this must be documented? For punctuation "forwards" it seems obvious (similar if one would not set a key or value...), and for the new `Record` api it's even more obvious. For the `KTable#transformValues` "ValueGetter" case, it's a little different though. Might be worth to repeat here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r644510969



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
##########
@@ -43,16 +43,12 @@
 
     private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
     private NamedCache cache;
-    private Metrics innerMetrics;
-    private StreamsMetricsImpl metrics;
-    private final String taskIDString = "0.0";
-    private final String underlyingStoreName = "storeName";

Review comment:
       Just some additional side cleanup in this test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r644511461



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -319,7 +321,7 @@ public void setRecordMetadata(final String topic,
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       This change is a little tricky... But if we consider it a bug-fix, it might still be ok?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647115686



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not

Review comment:
       Well, yes and no. This PR is only documenting the status quo of the current (but "old") API.
   
   There is https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API that introduces a new API, including:
   ```
   interface ProcessorContext<K, V> { // this is a new class with same name but located in different/new package
     Optional<RecordMetadata> recordMetadata();
   }
   ```
   The KIP is not fully implemented yet, and thus the current API is not yet deprecated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647126680



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -319,7 +321,7 @@ public void setRecordMetadata(final String topic,
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       Thinking about is more, it might be best to keep the code as is, and allow `null`.
     - adding the new check might break existing test code
     - converting `null` to empty header might also break tests
     - users don't have a good way to pass in headers, as `Headers` is an interface (and there is no other public API they could use to create an object)
     - it was not a problem so far, and long term we want to deprecate this "old" API anyway (so we might want to avoid a case of pre-mature optimization)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647122650



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##########
@@ -16,19 +16,51 @@
  */
 package org.apache.kafka.streams.processor.api;
 
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
 public interface RecordMetadata {
     /**
-     * @return  The topic of the original record received from Kafka
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.

Review comment:
       @vvcephei Thinking about this more, could we guarantee that we never return `null` on those methods (and thus can keep the JavaDocs simple)? If we don't have metadata, `Optional<RecordMetadata>` should be empty to begin with (at least ideally)?
   
   But I was not sure given the current implementation. `AbstractProcessorContext` implement this method like
   ```
       @Override
       public Optional<RecordMetadata> recordMetadata() {
           return Optional.ofNullable(recordContext);
       }
   ``` 
   
   Or would `recordContext` be guaranteed to be `null` if the new API is used? (For this case, as `KTable#transformValues()` is using the "old" API, we might no need to document anything (or should we document it anyway, to be forward looking?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r644510383



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -43,12 +43,11 @@ public ProcessorRecordContext(final long timestamp,
                                   final int partition,
                                   final String topic,
                                   final Headers headers) {
-
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       I think, for the internal change, this is actually ok. (I am still ok to split this into a separate PR, and keep this on a JavaDocs only PR).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647702594



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
##########
@@ -17,39 +17,94 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 
 /**
  * The context associated with the current record being processed by
  * an {@link Processor}
  */
 public interface RecordContext {
+
     /**
-     * @return  The offset of the original record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the topic name
      */
-    long offset();
+    String topic();
 
     /**
-     * @return  The timestamp extracted from the record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the partition id
      */
-    long timestamp();
+    int partition();
 
     /**
-     * @return  The topic the record was received on;
-     *          could be null if it is not available
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the offset
      */
-    String topic();
+    long offset();
 
     /**
-     * @return  The partition the record was received on;
-     *          could be -1 if it is not available
+     * Returns the current timestamp.

Review comment:
       For `RecordContext` interface, from my understanding it cannot be `-1` (because `RecordContext` is only used in `TopicNameExtractor`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647695386



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
##########
@@ -17,39 +17,94 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 
 /**
  * The context associated with the current record being processed by
  * an {@link Processor}
  */
 public interface RecordContext {
+
     /**
-     * @return  The offset of the original record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the topic name
      */
-    long offset();
+    String topic();
 
     /**
-     * @return  The timestamp extracted from the record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the partition id
      */
-    long timestamp();
+    int partition();
 
     /**
-     * @return  The topic the record was received on;
-     *          could be null if it is not available
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
+     * punctuation callback}, or while processing a record that was forwarded by a punctuation
+     * callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
+     *
+     * @return the offset
      */
-    String topic();
+    long offset();
 
     /**
-     * @return  The partition the record was received on;
-     *          could be -1 if it is not available
+     * Returns the current timestamp.
+     *
+     * <p> If it is triggered while processing a record streamed from the source processor,
+     * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
+     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.

Review comment:
       Yes. Otherwise it renders `org.apache.kafka.clients.consumer.ConsumerRecord` but we only want to have the short `ConsumerRecord`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10810:
URL: https://github.com/apache/kafka/pull/10810


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647715006



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -53,9 +53,9 @@
     TaskId taskId();
 
     /**
-     * The metadata of the source record, if is one. Processors may be invoked to
+     * Return the metadata of the current record if available. Processors may be invoked to

Review comment:
       If've seen you have changed all the others to say `Return`, then this is fine




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r645023265



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated offset.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid offset, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the headers of the current input record.

Review comment:
       I see what you mean. I think it's worth providing a little more "context."
   
   Even in the punctuation case, it is obvious when you're specifically thinking about the relationship between upstream punctuation and downstream processors' contexts, but if you have a large Streams topology, maintained by a large team, you can easily have a situation where one person adds a punctuation that interacts poorly with logic far downstream, primarily maintained by other people.
   
   In that case, the downstream folks might be encountering a situation where they can't figure out why the headers are empty sometimes. In those cases, a little bread-crumb that says "this might be empty if..." can save someone hours or days of debugging.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -43,12 +43,11 @@ public ProcessorRecordContext(final long timestamp,
                                   final int partition,
                                   final String topic,
                                   final Headers headers) {
-
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       Yeah, as long as the tests pass, I think it's ok.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -319,7 +321,7 @@ public void setRecordMetadata(final String topic,
         this.topic = topic;
         this.partition = partition;
         this.offset = offset;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       If you want to introduce the invariant without introducing an NPE regression, you could just coerce a null:
   
   ```suggestion
           this.headers = headers == null ? new RecordHeaders() : headers;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647659103



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##########
@@ -16,19 +16,51 @@
  */
 package org.apache.kafka.streams.processor.api;
 
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
 public interface RecordMetadata {
     /**
-     * @return  The topic of the original record received from Kafka
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.

Review comment:
       Oh boy. My recollection is that we were trying to keep a null context as a sentinel so that we would be able to return a “not present” Optional. But we are also populating “dummy” contexts in a few places, which might defeat that logic. You’d have to trace through the code to figure out whether or not this can happen. We’d better hurry up and deprecate the old API by the feature freeze so that we can simplify these code paths. Ultimately, I agree: we shouldn’t need nullable members inside an Optional container.
   
   In the mean time, I don’t think the warning is harmful. It might cause people to insert null checks that we can’t prove are unnecessary right now, but if someone wants to comb through the codebase to prove it, we can always update the Java doc later to say “never null”. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r645405935



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not

Review comment:
       My apologies if there is already a KIP, I couldn't fine one for it. But would it make sense to create new KIP to add a new method returning an `Optional<String>`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
##########
@@ -17,39 +17,82 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 
 /**
  * The context associated with the current record being processed by
  * an {@link Processor}
  */
 public interface RecordContext {
+
     /**
-     * @return  The offset of the original record received from Kafka;
-     *          could be -1 if it is not available
+     * Returns the topic name of the current input record; could be {@code null} if it is not

Review comment:
       As before, my apologies if there is already a KIP, I couldn't fine one for it. But would it make sense to create new KIP to add a new method returning an `Optional<String>`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback the record won't have an associated partition id.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid partition id, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the offset of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated offset.

Review comment:
       Same as previous point

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -158,32 +165,52 @@ Cancellable schedule(final Duration interval,
     void commit();
 
     /**
-     * Returns the topic name of the current input record; could be null if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the topic name of the current input record; could be {@code null} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback, the record won't have an associated topic.
+     * Another example is
+     * {@link org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+     * (and siblings), that do not always guarantee to provide a valid topic name, as they might be
+     * executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate call).
+     * Returns the partition id of the current input record; could be {@code -1} if it is not
+     * available.
+     *
+     * <p> For example, if this method is invoked within a punctuate callback, or while processing a
+     * record that was forwarded by a punctuation callback the record won't have an associated partition id.

Review comment:
       Unless I'm mistaken, in line 153, as similar thing is said, but different styles are used (here no `{@link ...}` is used).
   Probably is worth unifying both styles?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -53,9 +53,9 @@
     TaskId taskId();
 
     /**
-     * The metadata of the source record, if is one. Processors may be invoked to
+     * The metadata of the current record if available. Processors may be invoked to

Review comment:
       ```suggestion
        * Returns the metadata of the current record if available. Processors may be invoked to
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r645023666



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -43,12 +43,11 @@ public ProcessorRecordContext(final long timestamp,
                                   final int partition,
                                   final String topic,
                                   final Headers headers) {
-
         this.timestamp = timestamp;
         this.offset = offset;
         this.topic = topic;
         this.partition = partition;
-        this.headers = headers;
+        this.headers = Objects.requireNonNull(headers);

Review comment:
       Yeah, as long as the tests pass, I think it's ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10810: MINOR: Improve Kafka Streams JavaDocs with regard to record metadata

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10810:
URL: https://github.com/apache/kafka/pull/10810#discussion_r647696003



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -53,9 +53,9 @@
     TaskId taskId();
 
     /**
-     * The metadata of the source record, if is one. Processors may be invoked to
+     * Return the metadata of the current record if available. Processors may be invoked to

Review comment:
       We write JavaDocs imperatively. No `s` is intended.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org