You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/07 07:54:14 UTC
kafka git commit: KAFKA-5816;
add Produced class, KStream#to(topic, Produced), and
KStream#through(topic, Produced)
Repository: kafka
Updated Branches:
refs/heads/trunk b041c8d87 -> 667cd60dc
KAFKA-5816; add Produced class, KStream#to(topic, Produced), and KStream#through(topic, Produced)
Add the `Produced` class and `KStream` overloads that use it:
`KStream#to(String, Produced)`
`KStream#through(String, Produced)`
Deprecate all other to and through methods accept the single param methods that take a topic param
Author: Damian Guy <da...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3770 from dguy/kafka-5652-produced
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/667cd60d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/667cd60d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/667cd60d
Branch: refs/heads/trunk
Commit: 667cd60dc6ba68831423a256b6e455f7d955581c
Parents: b041c8d
Author: Damian Guy <da...@gmail.com>
Authored: Thu Sep 7 08:54:10 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Sep 7 08:54:10 2017 +0100
----------------------------------------------------------------------
docs/streams/developer-guide.html | 18 +-
.../apache/kafka/streams/kstream/KStream.java | 41 +++++
.../apache/kafka/streams/kstream/Produced.java | 163 +++++++++++++++++++
.../streams/kstream/internals/KStreamImpl.java | 34 ++--
.../kstream/internals/KStreamImplTest.java | 47 ++++++
5 files changed, 287 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 4da50a2..10220fb 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -1955,7 +1955,9 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
</p>
<pre class="brush: java;">
- joined.to("topic4");
+ joined.to("topic4");
+ // or using custom Serdes and a StreamPartitioner
+ joined.to("topic5", Produced.with(keySerde, valueSerde, myStreamPartitioner));
</pre>
If your application needs to continue reading and processing the records after they have been materialized
@@ -1963,11 +1965,15 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
Kafka Streams provides a convenience method called <code>through</code>:
<pre class="brush: java;">
- // equivalent to
- //
- // joined.to("topic4");
- // materialized = builder.stream("topic4");
- KStream<String, String> materialized = joined.through("topic4");
+ // equivalent to
+ //
+ // joined.to("topic4");
+ // materialized = builder.stream("topic4");
+ KStream<String, String> materialized = joined.through("topic4");
+ // if you need to provide serdes or a custom StreamPartitioner you can use
+ // the overloaded version
+ KStream<String, String> materialized = joined.through("topic5",
+ Produced.with(keySerde, valueSerde, myStreamPartitioner));
</pre>
<br>
http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index b8b5b8d..5a36cde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -675,7 +675,9 @@ public interface KStream<K, V> {
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
+ * @deprecated use {@code through(String, Produced)}
*/
+ @Deprecated
KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic);
@@ -696,7 +698,9 @@ public interface KStream<K, V> {
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
+ * @deprecated use {@code through(String, Produced)}
*/
+ @Deprecated
KStream<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic);
@@ -721,13 +725,33 @@ public interface KStream<K, V> {
* be used
* @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
+ * @deprecated use {@code through(String, Produced)}
*/
+ @Deprecated
KStream<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic);
/**
+ * Materialize this stream to a topic and creates a new {@code KStream} from the topic using the
+ * {@link Produced} instance for configuration of the {@link Serde key serde}, {@link Serde value serde},
+ * and {@link StreamPartitioner}.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ * <p>
+ * This is equivalent to calling {@link #to(String, Produced) to(someTopic, Produced.with(keySerde, valueSerde)}
+ * and {@link StreamsBuilder#stream(Serde, Serde, String...)
+ * StreamsBuilder#stream(keySerde, valSerde, someTopicName)}.
+ *
+ * @param topic
+ * @param produced
+ * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
+ */
+ KStream<K, V> through(final String topic,
+ final Produced<K, V> produced);
+
+ /**
* Materialize this stream to a topic using default serializers specified in the config and producer's
* {@link DefaultPartitioner}.
* The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
@@ -746,7 +770,9 @@ public interface KStream<K, V> {
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified producer's {@link DefaultPartitioner} will be used
* @param topic the topic name
+ * @deprecated use {@code to(String, Produced}
*/
+ @Deprecated
void to(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic);
@@ -762,7 +788,9 @@ public interface KStream<K, V> {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default serde defined in the configs will be used
* @param topic the topic name
+ * @deprecated use {@code to(String, Produced}
*/
+ @Deprecated
void to(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic);
@@ -782,13 +810,26 @@ public interface KStream<K, V> {
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will
* be used
* @param topic the topic name
+ * @deprecated use {@code to(String, Produced}
*/
+ @Deprecated
void to(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic);
/**
+ * Materialize this stream to a topic using the provided {@link Produced} instance.
+ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+ * started).
+ *
+ * @param produced the options to use when producing to the topic
+ * @param topic the topic name
+ */
+ void to(final String topic,
+ final Produced<K, V> produced);
+
+ /**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* A {@link Transformer} (provided by the given {@link TransformerSupplier}) is applied to each input record and
http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
new file mode 100644
index 0000000..488bd15
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
+import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+/**
+ * This class is used to provide the optional parameters when producing to new topics
+ * using {@link KStream#through(String, Produced)} or {@link KStream#to(String, Produced)}.
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class Produced<K, V> {
+
+ private Serde<K> keySerde;
+ private Serde<V> valueSerde;
+ private StreamPartitioner<? super K, ? super V> partitioner;
+
+ private Produced(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final StreamPartitioner<? super K, ? super V> partitioner) {
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ this.partitioner = partitioner;
+ }
+
+ /**
+ * Create a Produced instance with provided keySerde and valueSerde.
+ * @param keySerde Serde to use for serializing the key
+ * @param valueSerde Serde to use for serializing the value
+ * @param <K> key type
+ * @param <V> value type
+ * @return A new {@link Produced} instance configured with keySerde and valueSerde
+ * @see KStream#through(String, Produced)
+ * @see KStream#to(String, Produced)
+ */
+ public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
+ final Serde<V> valueSerde) {
+ return new Produced<>(keySerde, valueSerde, null);
+ }
+
+ /**
+ * Create a Produced instance with provided keySerde, valueSerde, and partitioner.
+ * @param keySerde Serde to use for serializing the key
+ * @param valueSerde Serde to use for serializing the value
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+ * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used
+ * @param <K> key type
+ * @param <V> value type
+ * @return A new {@link Produced} instance configured with keySerde, valueSerde, and partitioner
+ * @see KStream#through(String, Produced)
+ * @see KStream#to(String, Produced)
+ */
+ public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final StreamPartitioner<? super K, ? super V> partitioner) {
+ return new Produced<>(keySerde, valueSerde, partitioner);
+ }
+
+ /**
+ * Create a Produced instance with provided keySerde.
+ * @param keySerde Serde to use for serializing the key
+ * @param <K> key type
+ * @param <V> value type
+ * @return A new {@link Produced} instance configured with keySerde
+ * @see KStream#through(String, Produced)
+ * @see KStream#to(String, Produced)
+ */
+ public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde) {
+ return new Produced<>(keySerde, null, null);
+ }
+
+ /**
+ * Create a Produced instance with provided valueSerde.
+ * @param valueSerde Serde to use for serializing the key
+ * @param <K> key type
+ * @param <V> value type
+ * @return A new {@link Produced} instance configured with valueSerde
+ * @see KStream#through(String, Produced)
+ * @see KStream#to(String, Produced)
+ */
+ public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde) {
+ return new Produced<>(null, valueSerde, null);
+ }
+
+ /**
+ * Create a Produced instance with provided partitioner.
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and the key serde provides a {@link WindowedSerializer} for the key
+ * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be used
+ * @param <K> key type
+ * @param <V> value type
+ * @return A new {@link Produced} instance configured with partitioner
+ * @see KStream#through(String, Produced)
+ * @see KStream#to(String, Produced)
+ */
+ public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
+ return new Produced<>(null, null, partitioner);
+ }
+
+ /**
+ * Produce records using the provided partitioner.
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and the key serde provides a {@link WindowedSerializer} for the key
+ * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used
+ * @return this
+ */
+ public Produced<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
+ this.partitioner = partitioner;
+ return this;
+ }
+
+ /**
+ * Produce records using the provided valueSerde.
+ * @param valueSerde Serde to use for serializing the value
+ * @return this
+ */
+ public Produced<K, V> withValueSerde(final Serde<V> valueSerde) {
+ this.valueSerde = valueSerde;
+ return this;
+ }
+
+ /**
+ * Produce records using the provided keySerde.
+ * @param keySerde Serde to use for serializing the key
+ * @return this
+ */
+ public Produced<K, V> withKeySerde(final Serde<K> keySerde) {
+ this.keySerde = keySerde;
+ return this;
+ }
+
+ public Serde<K> keySerde() {
+ return keySerde;
+ }
+
+ public Serde<V> valueSerde() {
+ return valueSerde;
+ }
+
+ public StreamPartitioner<? super K, ? super V> streamPartitioner() {
+ return partitioner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8534da8..8aa7c58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -379,9 +380,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public KStream<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final StreamPartitioner<? super K, ? super V> partitioner, String topic) {
- to(keySerde, valSerde, partitioner, topic);
- return builder.stream(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic);
+ return through(topic, Produced.with(keySerde, valSerde, partitioner));
+ }
+
+ @Override
+ public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
+ to(topic, produced);
+ return builder.stream(null, new FailOnInvalidTimestamp(), produced.keySerde(), produced.valueSerde(), topic);
}
@Override
@@ -406,13 +412,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public KStream<K, V> through(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic) {
- return through(keySerde, valSerde, null, topic);
+ return through(topic, Produced.with(keySerde, valSerde));
}
@Override
public KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
- return through(null, null, partitioner, topic);
+ return through(topic, Produced.streamPartitioner(partitioner));
}
@Override
@@ -422,20 +428,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void to(final String topic) {
- to(null, null, null, topic);
+ to(topic, Produced.<K, V>with(null, null, null));
}
@Override
public void to(final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
- to(null, null, partitioner, topic);
+ to(topic, Produced.streamPartitioner(partitioner));
}
@Override
public void to(final Serde<K> keySerde,
final Serde<V> valSerde,
final String topic) {
- to(keySerde, valSerde, null, topic);
+ to(topic, Produced.with(keySerde, valSerde));
}
@SuppressWarnings("unchecked")
@@ -445,10 +451,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final StreamPartitioner<? super K, ? super V> partitioner,
final String topic) {
Objects.requireNonNull(topic, "topic can't be null");
- final String name = builder.newName(SINK_NAME);
+ to(topic, Produced.with(keySerde, valSerde, partitioner));
+ }
- final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
- final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
+ @SuppressWarnings("unchecked")
+ @Override
+ public void to(final String topic, final Produced<K, V> produced) {
+ Objects.requireNonNull(topic, "topic can't be null");
+ Objects.requireNonNull(produced, "Produced can't be null");
+ final String name = builder.newName(SINK_NAME);
+ final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
+ final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
+ final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner();
if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/667cd60d/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1fed374..ca454f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -30,20 +30,24 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -57,6 +61,9 @@ public class KStreamImplTest {
private KStream<String, String> testStream;
private StreamsBuilder builder;
+ @Rule
+ public final KStreamTestDriver driver = new KStreamTestDriver();
+
@Before
public void before() {
builder = new StreamsBuilder();
@@ -179,6 +186,33 @@ public class KStreamImplTest {
}
@Test
+ public void shouldSendDataThroughTopicUsingProduced() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String input = "topic";
+ final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input);
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
+
+ driver.setUp(builder);
+ driver.process(input, "a", "b");
+ assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b")));
+ }
+
+ @Test
+ public void shouldSendDataToTopicUsingProduced() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String input = "topic";
+ final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input);
+ final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
+ stream.to("to-topic", Produced.with(stringSerde, stringSerde));
+ builder.stream(stringSerde, stringSerde, "to-topic").process(processorSupplier);
+
+ driver.setUp(builder);
+ driver.process(input, "e", "f");
+ assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f")));
+ }
+
+ @Test
// TODO: this test should be refactored when we removed KStreamBuilder so that the created Topology contains internal topics as well
public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
final KStreamBuilder builder = new KStreamBuilder();
@@ -376,6 +410,18 @@ public class KStreamImplTest {
null);
}
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnThroughWhenProducedIsNull() {
+ testStream.through("topic", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowNullPointerOnToWhenProducedIsNull() {
+ testStream.to("topic", null);
+ }
+
+
@Test
public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah");
@@ -411,4 +457,5 @@ public class KStreamImplTest {
public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null);
}
+
}