You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/02/20 16:46:45 UTC

[kafka] branch 2.2 updated: KAFKA-9533: ValueTransform forwards `null` values (#8108)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 5fbe447  KAFKA-9533: ValueTransform forwards `null` values (#8108)
5fbe447 is described below

commit 5fbe4472bd42ed2b3b6e35b6d0bdc5238a6d0868
Author: Michael Viamari <mv...@users.noreply.github.com>
AuthorDate: Wed Feb 19 13:20:35 2020 -0800

    KAFKA-9533: ValueTransform forwards `null` values (#8108)
    
    Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.
    
    A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
 .../kstream/internals/KStreamTransformValues.java    |  5 ++++-
 .../internals/KStreamTransformValuesTest.java        | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 843606b..06216fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -53,7 +53,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
 
         @Override
         public void process(final K key, final V value) {
-            context.forward(key, valueTransformer.transform(key, value));
+            final R transformedValue = valueTransformer.transform(key, value);
+            if (transformedValue != null) {
+                context.forward(key, transformedValue);
+            }
         }
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 94d06eb..e05bc27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.SingletonNoOpValueTransformer;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.easymock.MockType;
@@ -41,6 +42,7 @@ import org.junit.runner.RunWith;
 
 import java.util.Properties;
 
+import static org.easymock.EasyMock.mock;
 import static org.hamcrest.CoreMatchers.isA;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
@@ -142,6 +144,24 @@ public class KStreamTransformValuesTest {
         assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
     }
 
+    @Test
+    public void shouldEmitNoRecordIfTransformReturnsNull() {
+        final ProcessorContext context = mock(ProcessorContext.class);
+        final ValueTransformerWithKey<Integer, Integer, Integer> valueTransformer = mock(ValueTransformerWithKey.class);
+        final KStreamTransformValues.KStreamTransformValuesProcessor<Integer, Integer, Integer> processor =
+            new KStreamTransformValues.KStreamTransformValuesProcessor<>(valueTransformer);
+        processor.init(context);
+
+        final Integer inputKey = 1;
+        final Integer inputValue = 10;
+        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andStubReturn(null);
+        EasyMock.replay(context);
+
+        processor.process(inputKey, inputValue);
+
+        EasyMock.verify(context);
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {