You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/06 02:09:28 UTC

[21/21] kafka git commit: KAFKA-3477: extended KStream/KTable API to specify custom partitioner for sinks

KAFKA-3477: extended KStream/KTable API to specify custom partitioner for sinks

Author: mjsax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1180 from mjsax/kafka-3477-streamPartitioner-DSL


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c5fe7bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c5fe7bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c5fe7bd

Branch: refs/heads/0.10.0
Commit: 5c5fe7bd795f5aab5248fb718c61c8ca3f2f571a
Parents: 35fadbf
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Apr 5 15:56:09 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  2 +-
 .../apache/kafka/streams/kstream/KStream.java   | 88 +++++++++++++++++---
 .../apache/kafka/streams/kstream/KTable.java    | 88 +++++++++++++++++---
 .../streams/kstream/internals/KStreamImpl.java  | 39 ++++++---
 .../streams/kstream/internals/KTableImpl.java   | 30 ++++++-
 5 files changed, 206 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 20958e4..e8fda10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes
  * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
  * based on the assignment of the input topic partitions so that all partitions are being
- * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
+ * consumed. If instances are added or failed, all instances will rebalance the partition assignment among themselves
  * to balance processing load.
  * <p>
  * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 2313b8b..e4933cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
  * KStream is an abstraction of a <i>record stream</i> of key-value pairs.
@@ -92,7 +93,7 @@ public interface KStream<K, V> {
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
-     * using default serializers and deserializers.
+     * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
      *
      * @param topic     the topic name
@@ -100,37 +101,98 @@ public interface KStream<K, V> {
     KStream<K, V> through(String topic);
 
     /**
+     * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+     * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
+     *
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic.
+     * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
+     * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
      * This is equivalent to calling {@link #to(Serde, Serde, String)} and
      * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param topic        the topic name
      */
     KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Materialize this stream to a topic using default serializers specified in the config.
+     * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+     * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic using default serializers specified in the config
+     * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      *
      * @param topic     the topic name
      */
     void to(String topic);
 
     /**
-     * Materialize this stream to a topic.
+     * Materialize this stream to a topic using default serializers specified in the config and a customizable
+     * {@link StreamPartitioner} to determine the distribution of records to partitions.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param topic     the topic name
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic. If {@code keySerde} provides a
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param topic        the topic name
      */
     void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
+     * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.Transformer} to all elements in this stream, one element at a time.
      *
      * @param transformerSupplier   the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 30ea882..581ee28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
  * KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
@@ -54,7 +55,7 @@ public interface KTable<K, V> {
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
-     * using default serializers and deserializers.
+     * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
      *
      * @param topic         the topic name
@@ -62,37 +63,98 @@ public interface KTable<K, V> {
     KTable<K, V> through(String topic);
 
     /**
+     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic using default serializers
+     * and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
+     *
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic.
+     * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
+     * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
      * This is equivalent to calling {@link #to(Serde, Serde, String)} and
      * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param topic        the topic name
      */
     KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Materialize this stream to a topic using default serializers specified in the config.
+     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
+     * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic using default serializers specified in the config
+     * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      *
      * @param topic         the topic name
      */
     void to(String topic);
 
     /**
-     * Materialize this stream to a topic.
+     * Materialize this stream to a topic using default serializers specified in the config
+     * and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     *
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic. If {@code keySerde} provides a
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param topic     the topic name
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param topic        the topic name
      */
     void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
+     * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Convert this stream to a new instance of {@link KStream}.
      */
     KStream<K, V> toStream();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 5889e07..0fb3984 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -194,37 +194,56 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
-        to(keySerde, valSerde, topic);
+    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+        to(keySerde, valSerde, partitioner, topic);
 
         return topology.stream(keySerde, valSerde, topic);
     }
 
     @Override
+    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        return through(keySerde, valSerde, null, topic);
+    }
+
+    @Override
+    public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
+        return through(null, null, partitioner, topic);
+    }
+
+    @Override
     public KStream<K, V> through(String topic) {
-        return through(null, null, topic);
+        return through(null, null, null, topic);
     }
 
     @Override
     public void to(String topic) {
-        to(null, null, topic);
+        to(null, null, null, topic);
+    }
+
+    @Override
+    public void to(StreamPartitioner<K, V> partitioner, String topic) {
+        to(null, null, partitioner, topic);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        to(keySerde, valSerde, null, topic);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
         String name = topology.newName(SINK_NAME);
-        StreamPartitioner<K, V> streamPartitioner = null;
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer();
-
-        if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
+        
+        if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
         }
 
-        topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
+        topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index fd464a0..156f2db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
 
 import java.util.Collections;
@@ -133,25 +134,46 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public KTable<K, V> through(Serde<K> keySerde,
                                 Serde<V> valSerde,
+                                StreamPartitioner<K, V> partitioner,
                                 String topic) {
-        to(keySerde, valSerde, topic);
+        to(keySerde, valSerde, partitioner, topic);
 
         return topology.table(keySerde, valSerde, topic);
     }
 
     @Override
+    public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        return through(keySerde, valSerde, null, topic);
+    }
+
+    @Override
+    public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
+        return through(null, null, partitioner, topic);
+    }
+
+    @Override
     public KTable<K, V> through(String topic) {
-        return through(null, null, topic);
+        return through(null, null, null, topic);
     }
 
     @Override
     public void to(String topic) {
-        to(null, null, topic);
+        to(null, null, null, topic);
+    }
+
+    @Override
+    public void to(StreamPartitioner<K, V> partitioner, String topic) {
+        to(null, null, partitioner, topic);
     }
 
     @Override
     public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
-        this.toStream().to(keySerde, valSerde, topic);
+        this.toStream().to(keySerde, valSerde, null, topic);
+    }
+
+    @Override
+    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+        this.toStream().to(keySerde, valSerde, partitioner, topic);
     }
 
     @Override