You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/18 07:33:15 UTC

[camel] branch camel-2.21.x updated: CAMEL-12651 - Allow to override serializing and deserializing default mechanism for kafka headers

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.21.x by this push:
     new 6a654fe  CAMEL-12651 - Allow to override serializing and deserializing default mechanism for kafka headers
6a654fe is described below

commit 6a654fe5620ab26d596a94a51f13ffdbc9366810
Author: tdanylchuk <da...@gmail.com>
AuthorDate: Sat Jul 14 18:13:51 2018 +0300

    CAMEL-12651 - Allow to override serializing and deserializing default mechanism for kafka headers
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 14 +++--
 .../camel/component/kafka/KafkaConfiguration.java  | 35 +++++++++++++
 .../camel/component/kafka/KafkaConsumer.java       |  9 ++--
 .../camel/component/kafka/KafkaProducer.java       | 38 ++++----------
 .../serde/DefaultKafkaHeaderDeserializer.java      | 25 +++++++++
 .../kafka/serde/DefaultKafkaHeaderSerializer.java  | 53 +++++++++++++++++++
 .../kafka/serde/KafkaHeaderDeserializer.java       | 25 +++++++++
 .../kafka/serde/KafkaHeaderSerializer.java         | 25 +++++++++
 .../component/kafka/KafkaConsumerFullTest.java     | 19 +++++++
 .../component/kafka/KafkaProducerFullTest.java     | 28 ++++++++--
 .../serde/DefaultKafkaHeaderDeserializerTest.java  | 39 ++++++++++++++
 .../serde/DefaultKafkaHeaderSerializerTest.java    | 61 ++++++++++++++++++++++
 .../springboot/KafkaComponentConfiguration.java    | 36 +++++++++++++
 13 files changed, 369 insertions(+), 38 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index bc4ed4b..a657378 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (91 parameters):
+==== Query Parameters (93 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -98,6 +98,7 @@ with the following path and query parameters:
 | *fetchWaitMaxMs* (consumer) | The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes | 500 | Integer
 | *groupId* (consumer) | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. This option is required for consumers. |  | String
 | *heartbeatIntervalMs* (consumer) | The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.  [...]
+| *kafkaHeaderDeserializer* (consumer) | Sets custom KafkaHeaderDeserializer for deserialization kafka headers values to camel headers values. |  | KafkaHeaderDeserializer
 | *keyDeserializer* (consumer) | Deserializer class for key that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
 | *maxPartitionFetchBytes* (consumer) | The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. | 1048576 | Integer
 | *maxPollIntervalMs* (consumer) | The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. |  | Long
@@ -117,6 +118,7 @@ with the following path and query parameters:
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. | none | String
 | *connectionMaxIdleMs* (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
 | *enableIdempotence* (producer) | If set to 'true' the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries may write duplicates of the retried message in the stream. If set to true this option will require max.in.flight.requests.per.connection to be set to 1 and retries cannot be zero and additionally acks must be set to 'all'. | false | boolean
+| *kafkaHeaderSerializer* (producer) | Sets custom KafkaHeaderDeserializer for serialization camel headers values to kafka headers values. |  | KafkaHeaderSerializer
 | *key* (producer) | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY |  | String
 | *keySerializerClass* (producer) | The serializer class for keys (defaults to the same as for messages if nothing is given). | org.apache.kafka.common.serialization.StringSerializer | String
 | *lingerMs* (producer) | The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delaythat is, rather than immediately sending out a record the producer will wa [...]
@@ -427,8 +429,14 @@ Producing flow backed by same behaviour - camel headers of particular exchange w
 
 Since kafka headers allows only `byte[]` values, in order camel exchnage header to be propagated its value should be serialized to `bytes[]`,
 otherwise header will be skipped.
-Following header value types are supported: `String`, `Integer`, `Long`, `Double`, `byte[]`.
-Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value.
+Following header value types are supported: `String`, `Integer`, `Long`, `Double`, `Boolean`, `byte[]`.
+Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value by default.
+In order to override default functionality uri parameters can be set: `kafkaHeaderDeserializer` for `from` route and `kafkaHeaderSerializer` for `to` route. Example:
+```
+from("kafka:my_topic?kafkaHeaderDeserializer=#myDeserializer")
+...
+.to("kafka:my_topic?kafkaHeaderSerializer=#mySerializer")
+```
 
 By default all headers are being filtered by `KafkaHeaderFilterStrategy`.
 Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes.
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index dabd475..6576fd7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -24,6 +24,10 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.Metadata;
@@ -66,6 +70,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     private int consumerStreams = 10;
     @UriParam(label = "consumer", defaultValue = "1")
     private int consumersCount = 1;
+    @UriParam(label = "consumer", description = "To use a custom KafkaHeaderDeserializer to deserialize kafka headers values")
+    private KafkaHeaderDeserializer kafkaHeaderDeserializer = new DefaultKafkaHeaderDeserializer();
 
     //interceptor.classes
     @UriParam(label = "common,monitoring")
@@ -221,6 +227,9 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     //reconnect.backoff.ms
     @UriParam(label = "producer", defaultValue = "false")
     private boolean enableIdempotence;
+    @UriParam(label = "producer", description = "To use a custom KafkaHeaderSerializer to serialize kafka headers values")
+    private KafkaHeaderSerializer kafkaHeaderSerializer = new DefaultKafkaHeaderSerializer();
+
     //reconnect.backoff.max.ms
     @UriParam(label = "common", defaultValue = "1000")
     private Integer reconnectBackoffMaxMs = 1000;
@@ -1614,4 +1623,30 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
         this.headerFilterStrategy = headerFilterStrategy;
     }
 
+    public KafkaHeaderDeserializer getKafkaHeaderDeserializer() {
+        return kafkaHeaderDeserializer;
+    }
+
+    /**
+     * Sets custom KafkaHeaderDeserializer for deserialization kafka headers values to camel headers values.
+     *
+     * @param kafkaHeaderDeserializer custom kafka header deserializer to be used
+     */
+    public void setKafkaHeaderDeserializer(final KafkaHeaderDeserializer kafkaHeaderDeserializer) {
+        this.kafkaHeaderDeserializer = kafkaHeaderDeserializer;
+    }
+
+    public KafkaHeaderSerializer getKafkaHeaderSerializer() {
+        return kafkaHeaderSerializer;
+    }
+
+    /**
+     * Sets custom KafkaHeaderDeserializer for serialization camel headers values to kafka headers values.
+     *
+     * @param kafkaHeaderSerializer custom kafka header serializer to be used
+     */
+    public void setKafkaHeaderSerializer(final KafkaHeaderSerializer kafkaHeaderSerializer) {
+        this.kafkaHeaderSerializer = kafkaHeaderSerializer;
+    }
+
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index c585e05..5f56270 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -31,6 +31,7 @@ import java.util.stream.StreamSupport;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
@@ -283,7 +284,7 @@ public class KafkaConsumer extends DefaultConsumer {
                                 }
                                 Exchange exchange = endpoint.createKafkaExchange(record);
 
-                                propagateHeaders(record, exchange, endpoint.getConfiguration().getHeaderFilterStrategy());
+                                propagateHeaders(record, exchange, endpoint.getConfiguration());
 
                                 // if not auto commit then we have additional information on the exchange
                                 if (!isAutoCommitEnabled()) {
@@ -428,10 +429,12 @@ public class KafkaConsumer extends DefaultConsumer {
         }
     }
 
-    private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+    private void propagateHeaders(ConsumerRecord<Object, Object> record, Exchange exchange, KafkaConfiguration kafkaConfiguration) {
+        HeaderFilterStrategy headerFilterStrategy = kafkaConfiguration.getHeaderFilterStrategy();
+        KafkaHeaderDeserializer headerDeserializer = kafkaConfiguration.getKafkaHeaderDeserializer();
         StreamSupport.stream(record.headers().spliterator(), false)
                 .filter(header -> shouldBeFiltered(header, exchange, headerFilterStrategy))
-                .forEach(header -> exchange.getIn().setHeader(header.key(), header.value()));
+                .forEach(header -> exchange.getIn().setHeader(header.key(), headerDeserializer.deserialize(header.key(), header.value())));
     }
 
     private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 71e8d3f..c113826 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.URISupport;
@@ -43,6 +44,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 
 public class KafkaProducer extends DefaultAsyncProducer {
@@ -176,8 +178,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
         final boolean hasMessageKey = messageKey != null;
 
         // extracting headers which need to be propagated
-        HeaderFilterStrategy headerFilterStrategy = endpoint.getConfiguration().getHeaderFilterStrategy();
-        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, headerFilterStrategy);
+        List<Header> propagatedHeaders = getPropagatedHeaders(exchange, endpoint.getConfiguration());
 
         Object msg = exchange.getIn().getBody();
 
@@ -233,10 +234,12 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return Collections.singletonList(record).iterator();
     }
 
-    private List<Header> getPropagatedHeaders(Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+    private List<Header> getPropagatedHeaders(Exchange exchange, KafkaConfiguration getConfiguration) {
+        HeaderFilterStrategy headerFilterStrategy = getConfiguration.getHeaderFilterStrategy();
+        KafkaHeaderSerializer headerSerializer = getConfiguration.getKafkaHeaderSerializer();
         return exchange.getIn().getHeaders().entrySet().stream()
                 .filter(entry -> shouldBeFiltered(entry, exchange, headerFilterStrategy))
-                .map(this::getRecordHeader)
+                .map(entry -> getRecordHeader(entry, headerSerializer))
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
@@ -245,37 +248,14 @@ public class KafkaProducer extends DefaultAsyncProducer {
         return !headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), exchange);
     }
 
-    private RecordHeader getRecordHeader(Map.Entry<String, Object> entry) {
-        byte[] headerValue = getHeaderValue(entry.getValue());
+    private RecordHeader getRecordHeader(Map.Entry<String, Object> entry, KafkaHeaderSerializer headerSerializer) {
+        byte[] headerValue = headerSerializer.serialize(entry.getKey(), entry.getValue());
         if (headerValue == null) {
             return null;
         }
         return new RecordHeader(entry.getKey(), headerValue);
     }
 
-    private byte[] getHeaderValue(Object value) {
-        if (value instanceof String) {
-            return ((String) value).getBytes();
-        } else if (value instanceof Long) {
-            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
-            buffer.putLong((Long) value);
-            return buffer.array();
-        } else if (value instanceof Integer) {
-            ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
-            buffer.putInt((Integer) value);
-            return buffer.array();
-        } else if (value instanceof Double) {
-            ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
-            buffer.putDouble((Double) value);
-            return buffer.array();
-        } else if (value instanceof byte[]) {
-            return (byte[]) value;
-        }
-        log.debug("Cannot propagate header value of type[{}], skipping... "
-                + "Supported types: String, Integer, Long, Double, byte[].", value != null ? value.getClass() : "null");
-        return null;
-    }
-
     @Override
     @SuppressWarnings({"unchecked", "rawtypes"})
     // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializer.java
new file mode 100644
index 0000000..5d26d08
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+public class DefaultKafkaHeaderDeserializer implements KafkaHeaderDeserializer {
+
+    @Override
+    public Object deserialize(final String key, final byte[] value) {
+        return value;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
new file mode 100644
index 0000000..45db0e8
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultKafkaHeaderSerializer implements KafkaHeaderSerializer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaHeaderSerializer.class);
+
+    @Override
+    public byte[] serialize(final String key, final Object value) {
+        if (value instanceof String) {
+            return ((String) value).getBytes();
+        } else if (value instanceof Long) {
+            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+            buffer.putLong((Long) value);
+            return buffer.array();
+        } else if (value instanceof Integer) {
+            ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+            buffer.putInt((Integer) value);
+            return buffer.array();
+        } else if (value instanceof Double) {
+            ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+            buffer.putDouble((Double) value);
+            return buffer.array();
+        } else if (value instanceof Boolean) {
+            return value.toString().getBytes();
+        } else if (value instanceof byte[]) {
+            return (byte[]) value;
+        }
+        LOG.debug("Cannot propagate header value of type[{}], skipping... "
+                + "Supported types: String, Integer, Long, Double, byte[].", value != null ? value.getClass() : "null");
+        return null;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderDeserializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderDeserializer.java
new file mode 100644
index 0000000..282fa95
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderDeserializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+/**
+ * Deserializer for kafka header value.
+ */
+public interface KafkaHeaderDeserializer {
+
+    Object deserialize(String key, byte[] value);
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderSerializer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderSerializer.java
new file mode 100644
index 0000000..1f62655
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/serde/KafkaHeaderSerializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+/**
+ * Serializer for kafka header value.
+ */
+public interface KafkaHeaderSerializer {
+
+    byte[] serialize(String key, Object value);
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index d99a983..a3509fc 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -24,7 +24,9 @@ import java.util.stream.StreamSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.After;
@@ -71,6 +73,13 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
         };
     }
 
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myHeaderDeserializer", new MyKafkaHeaderDeserializer());
+        return jndi;
+    }
+
     @Test
     public void kafkaMessageIsConsumedByCamel() throws InterruptedException, IOException {
         String propagatedHeaderKey = "PropagatedCustomHeader";
@@ -158,5 +167,15 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
         }
         to.assertIsSatisfied(3000);
     }
+
+    @Test
+    public void headerDeserializerCouldBeOverridden() {
+        KafkaEndpoint kafkaEndpoint = context.getEndpoint(
+                "kafka:random_topic?kafkaHeaderDeserializer=#myHeaderDeserializer", KafkaEndpoint.class);
+        assertIsInstanceOf(MyKafkaHeaderDeserializer.class, kafkaEndpoint.getConfiguration().getKafkaHeaderDeserializer());
+    }
+
+    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+    }
 }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 9e8b8b8..4ea82d1 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -36,6 +36,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultHeaderFilterStrategy;
 import org.apache.camel.impl.JndiRegistry;
@@ -102,6 +104,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry jndi = super.createRegistry();
         jndi.bind("myStrategy", new MyHeaderFilterStrategy());
+        jndi.bind("myHeaderSerializer", new MyKafkaHeadersSerializer());
         return jndi;
     }
 
@@ -317,13 +320,19 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
         byte[] propagatedBytesHeaderValue = new byte[]{121, 34, 34, 54, 5, 3, 54, -34};
 
+        String propagatedBooleanHeaderKey = "PROPAGATED_BOOLEAN_HEADER";
+        Boolean propagatedBooleanHeaderValue = Boolean.TRUE;
+
         Map<String, Object> camelHeaders = new HashMap<>();
         camelHeaders.put(propagatedStringHeaderKey, propagatedStringHeaderValue);
         camelHeaders.put(propagatedIntegerHeaderKey, propagatedIntegerHeaderValue);
         camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
         camelHeaders.put(propagatedDoubleHeaderKey, propagatedDoubleHeaderValue);
         camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
+        camelHeaders.put(propagatedBooleanHeaderKey, propagatedBooleanHeaderValue);
+
         camelHeaders.put("CustomObjectHeader", new Object());
+        camelHeaders.put("CustomNullObjectHeader", null);
         camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
 
         CountDownLatch messagesLatch = new CountDownLatch(1);
@@ -337,8 +346,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         ConsumerRecord<String, String> record = records.get(0);
         Headers headers = record.headers();
         assertNotNull("Kafka Headers should not be null.", headers);
-        // we have 5 headers and 1 header with breadcrumbId
-        assertEquals("Six propagated headers are expected.", 6, headers.toArray().length);
+        // we have 6 headers and 1 header with breadcrumbId
+        assertEquals("Seven propagated header is expected.", 7, headers.toArray().length);
         assertEquals("Propagated string value received", propagatedStringHeaderValue,
                 new String(getHeaderValue(propagatedStringHeaderKey, headers)));
         assertEquals("Propagated integer value received", propagatedIntegerHeaderValue,
@@ -348,14 +357,24 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         assertEquals("Propagated double value received", propagatedDoubleHeaderValue,
                 new Double(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey, headers)).getDouble()));
         assertArrayEquals("Propagated byte array value received", propagatedBytesHeaderValue, getHeaderValue(propagatedBytesHeaderKey, headers));
+        assertEquals("Propagated boolean value received", propagatedBooleanHeaderValue,
+                Boolean.valueOf(new String(getHeaderValue(propagatedBooleanHeaderKey, headers))));
     }
 
     @Test
     public void headerFilterStrategyCouldBeOverridden() {
-        KafkaEndpoint kafkaEndpoint = context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy", KafkaEndpoint.class);
+        KafkaEndpoint kafkaEndpoint = context.getEndpoint(
+                "kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy", KafkaEndpoint.class);
         assertIsInstanceOf(MyHeaderFilterStrategy.class, kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
     }
 
+    @Test
+    public void headerSerializerCouldBeOverridden() {
+        KafkaEndpoint kafkaEndpoint = context.getEndpoint(
+                "kafka:TOPIC_PROPAGATED_HEADERS?kafkaHeaderSerializer=#myHeaderSerializer", KafkaEndpoint.class);
+        assertIsInstanceOf(MyKafkaHeadersSerializer.class, kafkaEndpoint.getConfiguration().getKafkaHeaderSerializer());
+    }
+
     private byte[] getHeaderValue(String headerKey, Headers headers) {
         Header foundHeader = StreamSupport.stream(headers.spliterator(), false)
                 .filter(header -> header.key().equals(headerKey))
@@ -438,4 +457,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     private static class MyHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
     }
 
+    private static class MyKafkaHeadersSerializer extends DefaultKafkaHeaderSerializer {
+    }
+
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializerTest.java
new file mode 100644
index 0000000..4e78114
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderDeserializerTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+
+public class DefaultKafkaHeaderDeserializerTest {
+
+    private KafkaHeaderDeserializer deserializer = new DefaultKafkaHeaderDeserializer();
+
+    @Test
+    public void shouldDeserializeAsIs() {
+        byte[] value = new byte[]{0, 4, -2, 54, 126};
+
+        Object deserializedValue = deserializer.deserialize("someKey", value);
+
+        assertThat(deserializedValue, CoreMatchers.instanceOf(byte[].class));
+        assertArrayEquals(value, (byte[]) deserializedValue);
+    }
+
+}
\ No newline at end of file
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
new file mode 100644
index 0000000..6b90cbc
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/serde/DefaultKafkaHeaderSerializerTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.serde;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertArrayEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultKafkaHeaderSerializerTest {
+
+    private KafkaHeaderSerializer serializer = new DefaultKafkaHeaderSerializer();
+
+    private Object value;
+    private byte[] expectedResult;
+
+    public DefaultKafkaHeaderSerializerTest(Object value, byte[] expectedResult) {
+        this.value = value;
+        this.expectedResult = expectedResult;
+    }
+
+    @Test
+    public void serialize() {
+        byte[] result = serializer.serialize("someKey", value);
+
+        assertArrayEquals(expectedResult, result);
+    }
+
+    @Parameterized.Parameters
+    public static Collection primeNumbers() {
+        return Arrays.asList(new Object[][]{
+            {Boolean.TRUE, "true".getBytes()},               //boolean
+            {-12, new byte[]{-1, -1, -1, -12}},              //integer
+            {19L, new byte[]{0, 0, 0, 0, 0, 0, 0, 19}},      //long
+            {22.0D, new byte[]{64, 54, 0, 0, 0, 0, 0, 0}},   //double
+            {"someValue", "someValue".getBytes()},           //string
+            {new byte[]{0, 2, -43}, new byte[]{0, 2, -43}},  //byte[]
+            {null, null},                                    //null
+            {new Object(), null}                             //unknown type
+        });
+    }
+}
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index e46ad41..bcdd2f2 100644
--- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.kafka.springboot;
 import java.util.concurrent.ExecutorService;
 import javax.annotation.Generated;
 import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
+import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
@@ -757,6 +759,22 @@ public class KafkaComponentConfiguration
          * Camel message.
          */
         private HeaderFilterStrategy headerFilterStrategy;
+        /**
+         * Sets custom KafkaHeaderDeserializer for deserialization kafka headers
+         * values to camel headers values.
+         * 
+         * @param kafkaHeaderDeserializer
+         *            custom kafka header deserializer to be used
+         */
+        private KafkaHeaderDeserializer kafkaHeaderDeserializer;
+        /**
+         * Sets custom KafkaHeaderDeserializer for serialization camel headers
+         * values to kafka headers values.
+         * 
+         * @param kafkaHeaderSerializer
+         *            custom kafka header serializer to be used
+         */
+        private KafkaHeaderSerializer kafkaHeaderSerializer;
 
         public Boolean getTopicIsPattern() {
             return topicIsPattern;
@@ -1467,5 +1485,23 @@ public class KafkaComponentConfiguration
                 HeaderFilterStrategy headerFilterStrategy) {
             this.headerFilterStrategy = headerFilterStrategy;
         }
+
+        public KafkaHeaderDeserializer getKafkaHeaderDeserializer() {
+            return kafkaHeaderDeserializer;
+        }
+
+        public void setKafkaHeaderDeserializer(
+                KafkaHeaderDeserializer kafkaHeaderDeserializer) {
+            this.kafkaHeaderDeserializer = kafkaHeaderDeserializer;
+        }
+
+        public KafkaHeaderSerializer getKafkaHeaderSerializer() {
+            return kafkaHeaderSerializer;
+        }
+
+        public void setKafkaHeaderSerializer(
+                KafkaHeaderSerializer kafkaHeaderSerializer) {
+            this.kafkaHeaderSerializer = kafkaHeaderSerializer;
+        }
     }
 }
\ No newline at end of file