You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/02/02 19:37:59 UTC

[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r253276203
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##########
 @@ -55,4 +57,13 @@
 	 * @return null or the target topic
 	 */
 	String getTargetTopic(T element);
+
+	/**
+	 *
+	 * @param element The incoming element to be serialized
+	 * @return collection of headers (maybe empty)
+	 */
+	default Iterable<Map.Entry<String, byte[]>> headers(T element) {
 
 Review comment:
   Let's say we have 
   ```
   interface KafkaSerializationSchema<T> {
     ProducerRecord toKafka(T element)
   ```
   Adapter for KafkaSerializationSchema will be Kafka version specific, so we will need to have `KafkaSerializationSchemaAdapter08<T>`, `KafkaSerializationSchemaAdapter09<T>`.
   Also if we replacing partitioner we need to take into account, that the partitioner had initialization method - open and extract arguments, so it will become something like that
   ```
   interface KafkaSerializationSchema<T> {
     void open(int parallelInstanceId, int parallelInstances);
     ProducerRecord toKafka(T element, int[] partitions);
   ```
   in my opinion it doesn't look clean. 
   
   Alternative, we can keep Partitioner as is, but introduce version specific `ProducerRecord reparation( ProducerRecord source, int partition)` in _FlinkKafkaProduceXY_.
   Then FlinkKafkaProducer can do following:
   ```
     ProducerRecord record = schema.toKafka(value);
     if (flinkKafkaPartitioner != null) {
      int partition = flinkKafkaPartitioner.partition(value, record.key(), record.value(), record.topic(), partitions);
     if(record.partition() == null || partition != record.partition()) {
       record = this.reparation(record,partition);
     }
   ```
   So if we have `flinkKafkaPartitioner` and if it chooses different partition, we will have extra object allocation. Maybe not so bad?
   What is your opinion?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services