You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/12 18:06:04 UTC
[kafka] branch trunk updated: MINOR: Avoid double null check in
KStream#transform() (#6429)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2aca624 MINOR: Avoid double null check in KStream#transform() (#6429)
2aca624 is described below
commit 2aca6241624f6b924b3e0164a9b7d021d80096b6
Author: cadonna <ca...@users.noreply.github.com>
AuthorDate: Tue Mar 12 19:05:53 2019 +0100
MINOR: Avoid double null check in KStream#transform() (#6429)
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../streams/kstream/internals/KStreamImpl.java | 27 +++++++++++++---------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0eda64f..856536c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -439,17 +439,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(this.streamsGraphNode, sinkNode);
}
- @Override
- public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
- final String... stateStoreNames) {
- Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
- return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
- }
-
- @Override
- public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
- final String... stateStoreNames) {
- Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
+ private <K1, V1> KStream<K1, V1> doFlatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+ final String... stateStoreNames) {
final String name = builder.newProcessorName(TRANSFORM_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
@@ -465,6 +456,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
@Override
+ public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
+ return doFlatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
+ }
+
+ @Override
+ public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
+ final String... stateStoreNames) {
+ Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
+ return doFlatTransform(transformerSupplier, stateStoreNames);
+ }
+
+ @Override
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null");