You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/16 08:51:01 UTC

[GitHub] [kafka] JoelWee opened a new pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

JoelWee opened a new pull request #9186:
URL: https://github.com/apache/kafka/pull/9186


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485126654



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.

Review comment:
       I think that makes sense - but that will mean removing the previous logic of "`mappedKey` is null implies key not found in global table"? (Original line 62)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485096581



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.
+        // This happens for GlobalKTables but never for KTables since keyMapper just returns the key.
+        // For non-null keys, if {@code keyMapper} returns {@code null} it implies there is no match,
         // so ignore unless it is a left join
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final K2 mappedKey = keyMapper.apply(key, value);
+        if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == null) || value == null) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
             final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));

Review comment:
       At this point, we know that `mappedKey != null`, otherwise, we would have dropped the record.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485095735



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.

Review comment:
       I guess we don't care about the original `key` any longer and only consider if `keyMapper` returns `null` or not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485097186



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join

Review comment:
       Sorry, I think my original comment here was a bit ambiguous & confusingly phrased. What I meant was that the _removal_ of the comment seemed correct to me, ie we should not make any special exceptions for the left join case and should remove the `leftJoin` part of the `if (leftJoin || value2 != null) ` check down on line 79




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r474931289



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();
+        }
+
+        // we allow the case where the key is null but mappedKey is not null and thus
+        // we need to guard against nullPointerExceptions. This may happen for GlobalKTables.
+        // For KTables, the keyMapper simply returns the key, so this will never happen
+        Optional<K2> maybeMappedKey;
+        try {
+            maybeMappedKey = Optional.ofNullable(keyMapper.apply(key, value));
+        } catch (final NullPointerException e) {

Review comment:
       Should we really catch NPE here? It seems like if the user wants to return a non-null mapped key from a null key, then they should handle the null case specifically in their `keyMapper` and not just throw an NPE. In general, an NPE is a sign that something has gone wrong. I would be pretty surprised if I threw an NPE explicitly in my user code and it just got swallowed and interpreted as if I had actually returned null.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {

Review comment:
       We need to remove this too, right? We shouldn't forward anything regardless of whether it's a left join, if the mapped key is null then there's nothing to map it to

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));

Review comment:
       Why remove this check? The `valueGetter.get` does an actual table lookup, which would be wasteful if we're going to skip this record anyways because the mapped key is null. Also, I'm pretty sure the lookup would throw an NPE




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.

Review comment:
       Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like
   ```
   // If the mappedKey is null, we ignore it as invalid. This should never happen for KTables since keyMapper 
   // just returns the key, but for GlobalKTables a non-null key can result in a null mappedKey. Since there 
   // can't be a match for a null mappedKey, we drop it
   ```
   
   ...or something. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r483998338



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join

Review comment:
       Yea I agree the comment is right and we need to allow the case of non-null `key` with null `mappedKey` when it's a left join, since we view a null `mappedKey` as equivalent to "key not found in GlobalKTable" (except if the `key` is null, in which it should just be invalid I think?). Have made the changes now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485217278



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join

Review comment:
       Ah yeah sorry, I'm getting things mixed up here...this comment is referring to when the mappedKey is null while the condition I cited below now only applies to when the value is null.  Your reasoning sounds correct, we should still process the record in that case if it's a left join. But we should also remove this comment, since if the mappedKey is null then we drop it, regardless of if its a left join or any other




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475196944



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));

Review comment:
       `maybeExtractMappedKey` will return `Optional.empty()` if the `mappedKey` is null. So `mappedKey` will never be null in this portion of the code and we can skip the check I think




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475792745



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();

Review comment:
       This seems a little subtle. Can we just return the actual mapped key (or `.empty()`) in this method, and keep the explicit null check for `value` up above?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#issuecomment-678455951


   @ableegoldman Can you help to review?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r477019086



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();

Review comment:
       Yep agreed, was originally worried about NPEs arising from the null value as well. Done now :) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();

Review comment:
       Yep agreed – was originally worried about NPEs arising from the null value as well. Done now :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485097186



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join

Review comment:
       Sorry, I think my original comment here was a bit ambiguous. What I meant was that the _removal_ of the comment seemed correct to me, ie we should not make any special exceptions for the left join case and should remove the `leftJoin` part of the `if (leftJoin || value2 != null) ` check down on line 79




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9186:
URL: https://github.com/apache/kafka/pull/9186


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485096581



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.
+        // This happens for GlobalKTables but never for KTables since keyMapper just returns the key.
+        // For non-null keys, if {@code keyMapper} returns {@code null} it implies there is no match,
         // so ignore unless it is a left join
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final K2 mappedKey = keyMapper.apply(key, value);
+        if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == null) || value == null) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
             final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));

Review comment:
       At this point, we know that `mappedKey != null`, otherwise, we would drop the record.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485096090



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.
+        // This happens for GlobalKTables but never for KTables since keyMapper just returns the key.
+        // For non-null keys, if {@code keyMapper} returns {@code null} it implies there is no match,
         // so ignore unless it is a left join
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final K2 mappedKey = keyMapper.apply(key, value);
+        if ((key == null && mappedKey == null) || (!leftJoin && mappedKey == null) || value == null) {

Review comment:
       This condition seems unnecessary complex. Should it not just be:
   ```
   if (mappedKey == null || value == null) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#issuecomment-678455265


   Thanks for the PR @JoelWee -- Not exactly sure what NPE test you have in mind, but feel free to add a new test class `src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessorTest.java` if required.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485130942



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join

Review comment:
       Right, thanks for the clarification! Wouldn't we still want the` leftJoin` in that case though? When we reach the `leftJoin` in the [code](https://github.com/apache/kafka/pull/9186/commits/e9616c64dfdc33481d0b831f80ecd0385801c761), the `mappedKey` is never null but it might not exist in the GlobalKTable (and so `value2` is null). If we're doing a `leftJoin`, then we'll want to allow these null values? (If not, the `leftJoin` is just the same as the normal `join`?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475800482



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();
+        }
+
+        // we allow the case where the key is null but mappedKey is not null and thus
+        // we need to guard against nullPointerExceptions. This may happen for GlobalKTables.
+        // For KTables, the keyMapper simply returns the key, so this will never happen
+        Optional<K2> maybeMappedKey;
+        try {
+            maybeMappedKey = Optional.ofNullable(keyMapper.apply(key, value));
+        } catch (final NullPointerException e) {

Review comment:
       Yeah the compatibility argument is reasonable, but you could say that users should have been handling the null case all along and it was just due to a bug in Streams that we never actually passed in a null key. If people _aren't_ handling null, we should try and alert them quickly (and nothing catches people's attention faster than an NPE)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#issuecomment-674500361


   [KAFKA-10277](https://issues.apache.org/jira/browse/KAFKA-10277?jql=project%20%3D%20KAFKA%20AND%20labels%20%3D%20newbie%20AND%20status%20%3D%20Open%20ORDER%20BY%20updated%20DESC)
   
   Hi @mjsax, please could you have a look? :)
   
   It feels like if implemented this way, we should have a NullPointerException test for the processor, but I'm not sure where that test should be put in. It fits best as a direct unit test for the processor but it doesn't look like any of those tests are done. And it's somewhat inconvenient to add it to the existing join tests because of the way they are set up


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475198868



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();
+        }
+
+        // we allow the case where the key is null but mappedKey is not null and thus
+        // we need to guard against nullPointerExceptions. This may happen for GlobalKTables.
+        // For KTables, the keyMapper simply returns the key, so this will never happen
+        Optional<K2> maybeMappedKey;
+        try {
+            maybeMappedKey = Optional.ofNullable(keyMapper.apply(key, value));
+        } catch (final NullPointerException e) {

Review comment:
       Yea I think what you say makes sense. When I wrote this I was thinking about backwards compatibility: previously all null keys would never be passed to keyMappers but now they are. It feels like existing users might be surprised if their existing keyMappers start throwing NPEs. But it would also be suprising if the NPE a user throws get swallowed as you mentioned. 
   
   I agree that it's better not to catch the NPE and it possibly looks like users should have always been handling the case where the key is null anyway so it should be fine?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#issuecomment-701582027


   Thanks for the fix @JoelWee! Merged to `trunk`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.

Review comment:
       Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like
   ```
   // If the mappedKey is null, we ignore it as invalid. This should never happen for KTables 
   // since keyMapper just returns the key, but for GlobalKTables a non-null key can result 
   // in a null mappedKey. There can't be a match for a null mappedKey, so we drop it
   ```
   
   ...or something. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#issuecomment-681138143


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485218625



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.

Review comment:
       Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like
   "If the mappedKey is null, we ignore it as invalid. This should never happen for KTables since keyMapper just returns the key, but for GlobalKTables a non-null key can result in a null mappedKey. Since there can't be a match for a null mappedKey, we drop it"
   
   ...or something. Thoughts?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.

Review comment:
       Well, if `mappedKey` is null then there can't be a match in the global table since we can't do a lookup with a null key. I think what @mjsax means here (correct me if wrong) is just that we could phrase it a bit differently to say something like
   ```
   If the mappedKey is null, we ignore it as invalid. This should never happen for KTables since keyMapper just returns the key, but for GlobalKTables a non-null key can result in a null mappedKey. Since there can't be a match for a null mappedKey, we drop it
   ```
   
   ...or something. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r481602010



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,23 +58,22 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join

Review comment:
       It looks like we removed this comment about not ignoring it if it's a left join, but we didn't actually remove the code for that (yet). Which one is right? It seems like the comment is correct, and we shouldn't ignore the null key regardless of whether it is a left join. In that case, we should remove the `leftJoin`  part of the condition on line 78 below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] JoelWee commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r485244413



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,22 +58,23 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
+        // We allow null keys unless {@code keyMapper} returns {@code null} and we ignore it as invalid.

Review comment:
       Yep makes sense. Have updated it now. Just noting here that this means we're changing a [test](https://github.com/apache/kafka/pull/9186/files#diff-e3715715832b244da2d8787362b0c570R230) we previously had




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475791126



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));

Review comment:
       Oh yeah, duh. Nevermind this 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org