You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/02/25 17:41:37 UTC
[kafka] branch 2.5 updated: Revert "KAFKA-9533: ValueTransform
forwards `null` values (#8108)"
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 0416121 Revert "KAFKA-9533: ValueTransform forwards `null` values (#8108)"
0416121 is described below
commit 0416121531960018d2404dc7c27464ed04133419
Author: bill <bb...@gmail.com>
AuthorDate: Tue Feb 25 12:33:50 2020 -0500
Revert "KAFKA-9533: ValueTransform forwards `null` values (#8108)"
This reverts commit a41d3d86c13a55a75e67b1635bc74361ebe6d7af.
---
.../kstream/internals/KStreamTransformValues.java | 5 +----
.../internals/KStreamTransformValuesTest.java | 20 --------------------
2 files changed, 1 insertion(+), 24 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 06216fc..843606b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -53,10 +53,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
@Override
public void process(final K key, final V value) {
- final R transformedValue = valueTransformer.transform(key, value);
- if (transformedValue != null) {
- context.forward(key, transformedValue);
- }
+ context.forward(key, valueTransformer.transform(key, value));
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 4dc68e0..196f71c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.SingletonNoOpValueTransformer;
import org.apache.kafka.test.StreamsTestUtils;
-import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@@ -43,7 +42,6 @@ import org.junit.runner.RunWith;
import java.util.Properties;
-import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
@@ -140,24 +138,6 @@ public class KStreamTransformValuesTest {
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
}
- @Test
- public void shouldEmitNoRecordIfTransformReturnsNull() {
- final ProcessorContext context = mock(ProcessorContext.class);
- final ValueTransformerWithKey<Integer, Integer, Integer> valueTransformer = mock(ValueTransformerWithKey.class);
- final KStreamTransformValues.KStreamTransformValuesProcessor<Integer, Integer, Integer> processor =
- new KStreamTransformValues.KStreamTransformValuesProcessor<>(valueTransformer);
- processor.init(context);
-
- final Integer inputKey = 1;
- final Integer inputValue = 10;
- EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andStubReturn(null);
- EasyMock.replay(context);
-
- processor.process(inputKey, inputValue);
-
- EasyMock.verify(context);
- }
-
@SuppressWarnings("unchecked")
@Test
public void shouldInitializeTransformerWithForwardDisabledProcessorContext() {