You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/21 20:29:50 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join

ableegoldman commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r474931289



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();
+        }
+
+        // we allow the case where the key is null but mappedKey is not null and thus
+        // we need to guard against nullPointerExceptions. This may happen for GlobalKTables.
+        // For KTables, the keyMapper simply returns the key, so this will never happen
+        Optional<K2> maybeMappedKey;
+        try {
+            maybeMappedKey = Optional.ofNullable(keyMapper.apply(key, value));
+        } catch (final NullPointerException e) {

Review comment:
       Should we really catch NPE here? It seems like if the user wants to return a non-null mapped key from a null key, then they should handle the null case specifically in their `keyMapper` and not just throw an NPE. In general, an NPE is a sign that something has gone wrong. I would be pretty surprised if I threw an NPE explicitly in my user code and it just got swallowed and interpreted as if I had actually returned null.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {

Review comment:
       We need to remove this too, right? We shouldn't forward anything regardless of whether it's a left join, if the mapped key is null then there's nothing to map it to

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey));

Review comment:
       Why remove this check? The `valueGetter.get` does an actual table lookup, which would be wasteful if we're going to skip this record anyways because the mapped key is null. Also, I'm pretty sure the lookup would throw an NPE




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org