You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "AlexAxeman (via GitHub)" <gi...@apache.org> on 2023/03/27 14:11:39 UTC

[GitHub] [flink-connector-kafka] AlexAxeman opened a new pull request, #18: add kafka header support

AlexAxeman opened a new pull request, #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18

   ## What is the purpose of the change
   
   The default `org.apache.flink.connector.kafka.sink.KafkaSink` does not support adding Kafka record headers when using KafkaRecordSerializationSchemaBuilder, which is the most convenient way to create a Kafka sink. This PR adds support for Kafka headers to `KafkaRecordSerializationSchemaBuilder`.
   
   
   ## Brief change log
   
     - *Implemented a `HeaderProducer` that allows creating `Header`s from the input element*
     - *Added setters to `KafkaRecordSerializationSchemaBuilder` to allow setting a `HeaderProducer`*
     - *Added an optional `HeaderProducer` constructor argument to `KafkaRecordSerializationSchemaWrapper` that now uses a `ProducerRecord` constructor that includes the headers.* 
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change added tests and can be verified as follows:
   
     - *Added tests to `KafkaRecordSerializationSchemaBuilderTest`*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes?
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163096185


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   Actually - on second thought, the default behavior / case can simply be that `headerProvider` is `null`. For example, `keySerializer` is nullable for a `KafkaRecordSerializationSchema` - the same can apply for the `headerProvider`. In this case, we don't need a default implementation for the `HeaderProvider#getProvider` method any more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1504282565

   @AlexAxeman thanks a lot for your contribution! +1 LGTM.
   
   Since there's other work that is sort of depending on this feature, I'll proceed to merge this PR and address my own comments while merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163104861


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   @AlexAxeman see my latest comment above about the default case being `headerProvider` is null.
   
   In that case, we no longer need / want to return `Optional`.
   
   i.e.
   
   ```
   interface HeaderProvider {
       Headers getHeaders(IN input); // no default implementation
   }
   ```
   
   and in `KafkaRecordSerializationSchemaWrapper#serialize()`:
   ```
   if (headerProvider != null) {
       return new ProducerRecord(
         ...,
         headerProvider.getHeaders(input)
       )
   } else (
       return new ProducerRecord() // no headers
   )
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   @AlexAxeman see my latest comment above about the default case being `headerProvider` is null.
   
   In that case, we no longer need / want to return `Optional`.
   
   i.e.
   
   ```
   interface HeaderProvider {
       Headers getHeaders(IN input); // no default implementation
   }
   ```
   
   and in `KafkaRecordSerializationSchemaWrapper#serialize()`:
   ```
   if (headerProvider != null) {
       return new ProducerRecord(
         ...,
         headerProvider.getHeaders(input)
       )
   } else (
       return new ProducerRecord(...) // no headers
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163427396


##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -86,6 +89,16 @@ public void testDoNotAllowMultipleTopicSelector() {
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    @Test
+    public void testDoNotAllowMultipleHeaderProducers() {

Review Comment:
   I don't think you really need this restriction for the headers.
   
   The key / value serializer setters had the check because there were different entry points for setting the serialization schema, i.e. either via `setKafkaXSerializer` or `setKeySerializationSchema`. We don't want the user to be setting the key serializer through both entrypoints, hence the check.
   
   This isn't a problem for the header provider setting. Same goes for the Kafka partitioner, for example - we don't check multiple sets there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] boring-cyborg[bot] commented on pull request #18: add kafka header support

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1485180339

   Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163021963


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {

Review Comment:
   > it does not assign the headers, but produces them
   
   In that case, how about `HeaderProvider`? `Provider` seems to be a more canonical name for this.
   
   And correspondingly, the single method would be called `getHeaders(IN input)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "AlexAxeman (via GitHub)" <gi...@apache.org>.
AlexAxeman commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1162890942


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   Yeah, thought about that. So it should be a `Collections.emptyList()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai closed pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai closed pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink
URL: https://github.com/apache/flink-connector-kafka/pull/18


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163083277


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   I was thinking about `Optional<Headers> getHeaders(IN input)`, and default is returning `Optional.empty()`.
   
   I'm also wondering if it's possible / makes sense to NOT expose the Kafka `Headers` class on the public API. Although Kafka API seems to be rather stable, in general its best if we can avoid exposing our dependencies' classes on the public interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] AlexAxeman commented on pull request #18: add kafka header support

Posted by "AlexAxeman (via GitHub)" <gi...@apache.org>.
AlexAxeman commented on PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1485181973

   @MartijnVisser @tzulitai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1160175017


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   Can we have a default implementation that doesn't create an empty `ArrayList` and `RecordHeaders` object on every input record?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {

Review Comment:
   nit: Would `HeaderAssigner` be a better name for this? `Producer` already has a meaning within the Kafka-namespace, perhaps it can cause some confusion. WDYT?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {

Review Comment:
   nit: if we rename the interface to `HeaderAssigner`, then this should be renamed to `assignHeaders` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1504292430

   Merged via https://github.com/apache/flink-connector-kafka/commit/a7785630e714af303b224c38d9a6caa89a551265
   
   I squashed my changes along with your comment. Please let me know if you find anything odd and would like me to address. Thanks again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] tzulitai commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163104861


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   @AlexAxeman see my latest comment above about the default case being `headerProvider` is null.
   
   In that case, we no longer need / want to return `Optional`.
   
   i.e.
   
   ```
   interface HeaderProvider {
       Headers getHeaders(IN input);
   }
   ```
   
   and in `KafkaRecordSerializationSchemaWrapper#serialize()`:
   ```
   if (headerProvider != null) {
       return new ProducerRecord(
         ...,
         headerProvider.getHeaders(input)
       )
   } else (
       return new ProducerRecord() // no headers
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink [flink-connector-kafka]

Posted by "Sandrock431 (via GitHub)" <gi...@apache.org>.
Sandrock431 commented on PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1855859711

   We're eagerly awaiting this change as well, and it's not in `3.0.2-1.18` either.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] RamanVerma commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "RamanVerma (via GitHub)" <gi...@apache.org>.
RamanVerma commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1153921860


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Iterable<Header> produceHeaders(IN input) {

Review Comment:
   I think you can use `Headers` instead of `Iterable<Header>` from `org.apache.kafka.common.header.Headers`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "AlexAxeman (via GitHub)" <gi...@apache.org>.
AlexAxeman commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1162889963


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {

Review Comment:
   Strictly, it does not assign the headers, but produces them. If you're concerned about the naming conflict, how about HeaderCreator? (not that I like that very much...). I'm happy to rename it to HeaderAssigner otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "AlexAxeman (via GitHub)" <gi...@apache.org>.
AlexAxeman commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163098741


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   Hm... Personally I don't really like that. The big thing about Optionals is to force the handling of possible `null` values. You don't need that protecting while returning collections, as you can always return an empty one, which is what I did. I'd much rather go for an empty (immutable) collection.
   
   Initially, I returned a `Iterable<Header>` and changed that to kafka API Headers upon request. I'm not really leaning to either side, so happy to change that back @RamanVerma, wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] AlexAxeman commented on a diff in pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "AlexAxeman (via GitHub)" <gi...@apache.org>.
AlexAxeman commented on code in PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#discussion_r1163100734


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/HeaderProducer.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/** Creates an {@link Iterable} of {@link Header}s from the input element. */
+@PublicEvolving
+public interface HeaderProducer<IN> extends Serializable {
+    default Headers produceHeaders(IN input) {
+        return new RecordHeaders(new ArrayList<>());

Review Comment:
   Just reading up on your second comment. Yes, I think that would be better than. The RecordProducer also handles this gracefully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-kafka] AndersSpringborg commented on pull request #18: [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink

Posted by "AndersSpringborg (via GitHub)" <gi...@apache.org>.
AndersSpringborg commented on PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1719440804

   In which release is this being published in? I cannot see it in the latest release, [3.0.0-1.17](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.0.0-1.17)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink [flink-connector-kafka]

Posted by "AlexAxeman (via GitHub)" <gi...@apache.org>.
AlexAxeman commented on PR #18:
URL: https://github.com/apache/flink-connector-kafka/pull/18#issuecomment-1855945984

   @tzulitai Do you know what happened with this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org