You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/06 02:09:28 UTC
[21/21] kafka git commit: KAFKA-3477: extended KStream/KTable API to
specify custom partitioner for sinks
KAFKA-3477: extended KStream/KTable API to specify custom partitioner for sinks
Author: mjsax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1180 from mjsax/kafka-3477-streamPartitioner-DSL
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c5fe7bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c5fe7bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c5fe7bd
Branch: refs/heads/0.10.0
Commit: 5c5fe7bd795f5aab5248fb718c61c8ca3f2f571a
Parents: 35fadbf
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Apr 5 15:56:09 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 2 +-
.../apache/kafka/streams/kstream/KStream.java | 88 +++++++++++++++++---
.../apache/kafka/streams/kstream/KTable.java | 88 +++++++++++++++++---
.../streams/kstream/internals/KStreamImpl.java | 39 ++++++---
.../streams/kstream/internals/KTableImpl.java | 30 ++++++-
5 files changed, 206 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 20958e4..e8fda10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes
* on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
* based on the assignment of the input topic partitions so that all partitions are being
- * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
+ * consumed. If instances are added or failed, all instances will rebalance the partition assignment among themselves
* to balance processing load.
* <p>
* Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 2313b8b..e4933cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
/**
* KStream is an abstraction of a <i>record stream</i> of key-value pairs.
@@ -92,7 +93,7 @@ public interface KStream<K, V> {
/**
* Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
- * using default serializers and deserializers.
+ * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
* This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
*
* @param topic the topic name
@@ -100,37 +101,98 @@ public interface KStream<K, V> {
KStream<K, V> through(String topic);
/**
+ * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+ * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
+ *
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
* Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic.
+ * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
+ * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+ * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
* This is equivalent to calling {@link #to(Serde, Serde, String)} and
* {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name
*/
KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
/**
- * Materialize this stream to a topic using default serializers specified in the config.
+ * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+ * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
+ * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+ * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
+ * Materialize this stream to a topic using default serializers specified in the config
+ * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
*
* @param topic the topic name
*/
void to(String topic);
/**
- * Materialize this stream to a topic.
+ * Materialize this stream to a topic using default serializers specified in the config and a customizable
+ * {@link StreamPartitioner} to determine the distribution of records to partitions.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param topic the topic name
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ void to(StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
+ * Materialize this stream to a topic. If {@code keySerde} provides a
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+ * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param topic the topic name
*/
void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
/**
+ * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+ * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
* Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.Transformer} to all elements in this stream, one element at a time.
*
* @param transformerSupplier the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 30ea882..581ee28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StreamPartitioner;
/**
* KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
@@ -54,7 +55,7 @@ public interface KTable<K, V> {
/**
* Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
- * using default serializers and deserializers.
+ * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
* This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
*
* @param topic the topic name
@@ -62,37 +63,98 @@ public interface KTable<K, V> {
KTable<K, V> through(String topic);
/**
+ * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic using default serializers
+ * and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
+ *
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
* Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic.
+ * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
+ * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+ * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
* This is equivalent to calling {@link #to(Serde, Serde, String)} and
* {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default key serde defined in the configuration will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default value serde defined in the configuration will be used
- * @param topic the topic name
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param topic the topic name
*/
KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
/**
- * Materialize this stream to a topic using default serializers specified in the config.
+ * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
+ * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+ * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
+ * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default key serde defined in the configuration will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default value serde defined in the configuration will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+ * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
+ * Materialize this stream to a topic using default serializers specified in the config
+ * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
*
* @param topic the topic name
*/
void to(String topic);
/**
- * Materialize this stream to a topic.
+ * Materialize this stream to a topic using default serializers specified in the config
+ * and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+ *
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ void to(StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
+ * Materialize this stream to a topic. If {@code keySerde} provides a
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+ * — otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
*
- * @param keySerde key serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param valSerde value serde used to send key-value pairs,
- * if not specified the default serde defined in the configs will be used
- * @param topic the topic name
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param topic the topic name
*/
void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
/**
+ * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+ * if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+ * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+ * — otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+ * @param topic the topic name
+ */
+ void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+ /**
* Convert this stream to a new instance of {@link KStream}.
*/
KStream<K, V> toStream();
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 5889e07..0fb3984 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -194,37 +194,56 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
- public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
- to(keySerde, valSerde, topic);
+ public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+ to(keySerde, valSerde, partitioner, topic);
return topology.stream(keySerde, valSerde, topic);
}
@Override
+ public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+ return through(keySerde, valSerde, null, topic);
+ }
+
+ @Override
+ public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
+ return through(null, null, partitioner, topic);
+ }
+
+ @Override
public KStream<K, V> through(String topic) {
- return through(null, null, topic);
+ return through(null, null, null, topic);
}
@Override
public void to(String topic) {
- to(null, null, topic);
+ to(null, null, null, topic);
+ }
+
+ @Override
+ public void to(StreamPartitioner<K, V> partitioner, String topic) {
+ to(null, null, partitioner, topic);
}
- @SuppressWarnings("unchecked")
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+ to(keySerde, valSerde, null, topic);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
String name = topology.newName(SINK_NAME);
- StreamPartitioner<K, V> streamPartitioner = null;
Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer();
-
- if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
+
+ if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
- streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+ partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
}
- topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
+ topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index fd464a0..156f2db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
import java.util.Collections;
@@ -133,25 +134,46 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
@Override
public KTable<K, V> through(Serde<K> keySerde,
Serde<V> valSerde,
+ StreamPartitioner<K, V> partitioner,
String topic) {
- to(keySerde, valSerde, topic);
+ to(keySerde, valSerde, partitioner, topic);
return topology.table(keySerde, valSerde, topic);
}
@Override
+ public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+ return through(keySerde, valSerde, null, topic);
+ }
+
+ @Override
+ public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
+ return through(null, null, partitioner, topic);
+ }
+
+ @Override
public KTable<K, V> through(String topic) {
- return through(null, null, topic);
+ return through(null, null, null, topic);
}
@Override
public void to(String topic) {
- to(null, null, topic);
+ to(null, null, null, topic);
+ }
+
+ @Override
+ public void to(StreamPartitioner<K, V> partitioner, String topic) {
+ to(null, null, partitioner, topic);
}
@Override
public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
- this.toStream().to(keySerde, valSerde, topic);
+ this.toStream().to(keySerde, valSerde, null, topic);
+ }
+
+ @Override
+ public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+ this.toStream().to(keySerde, valSerde, partitioner, topic);
}
@Override