You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/18 14:32:07 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #11356: [KAFKA-10539] Convert KStreamImpl joins to new PAPI

vvcephei commented on a change in pull request #11356:
URL: https://github.com/apache/kafka/pull/11356#discussion_r730987030



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -200,11 +217,11 @@ private void emitNonJoinedOuterRecords(final KeyValueStore<TimestampedKeyAndJoin
             // throttle the emit frequency to a (configurable) interval;
             // we use processing time to decouple from data properties,
             // as throttling is a non-functional performance optimization
-            if (context.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
+            if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {

Review comment:
       This was an oversight in the kip that added "current system time" to the public API. They only added it to the deprecated ProcessorContext. I planned to send a PR to fix it, but for now this is a fine solution.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -108,26 +116,35 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
             }
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public void process(final K key, final V1 value) {
+        public void process(final Record<K, V1> record) {
             // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
             //
             // we also ignore the record if value is null, because in a key-value data model a null-value indicates
             // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
             // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
             // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-            if (key == null || value == null) {
-                LOG.warn(
-                    "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
-                    key, value, context().topic(), context().partition(), context().offset()
-                );
+            if (record.key() == null || record.value() == null) {
+                if (context().recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = context().recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key or value. "
+                            + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. Topic, partition, and offset not known."

Review comment:
       ```suggestion
                           "Skipping record due to null key or value. Topic, partition, and offset not known."
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org