You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/22 08:24:11 UTC

[camel] branch master updated: CAMEL-14980: camel-kafka - Consumer should only re-connect on retryable exceptions. Other severe exceptions should be propagated. (#5248)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 79d878e  CAMEL-14980: camel-kafka - Consumer should only re-connect on retryable exceptions. Other severe exceptions should be propagated. (#5248)
79d878e is described below

commit 79d878e62b32dc5bff89a43e45932f67a61db58e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 22 09:23:44 2021 +0100

    CAMEL-14980: camel-kafka - Consumer should only re-connect on retryable exceptions. Other severe exceptions should be propagated. (#5248)
    
    CAMEL-14980: camel-kafka - Consumer should only re-connect on retryable exceptions. Other severe exceptions should be propagated.
---
 .../org/apache/camel/catalog/components/kafka.json |   1 +
 .../apache/camel/catalog/docs/kafka-component.adoc |  30 ++-
 .../component/kafka/KafkaComponentConfigurer.java  |  14 +-
 .../component/kafka/KafkaEndpointConfigurer.java   |   6 +
 .../component/kafka/KafkaEndpointUriFactory.java   |   3 +-
 .../org/apache/camel/component/kafka/kafka.json    |   3 +
 .../camel-kafka/src/main/docs/kafka-component.adoc |  30 ++-
 .../kafka/DefaultPollExceptionStrategy.java        |  44 ++++
 .../camel/component/kafka/KafkaComponent.java      |  34 +++
 .../camel/component/kafka/KafkaConfiguration.java  |  21 ++
 .../camel/component/kafka/KafkaConsumer.java       | 278 +++++++++++++++------
 .../component/kafka/PollExceptionStrategy.java     |  34 +++
 .../apache/camel/component/kafka/PollOnError.java  |  31 +++
 .../dsl/KafkaComponentBuilderFactory.java          |  45 ++++
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java  |  62 +++++
 .../modules/ROOT/pages/kafka-component.adoc        |  30 ++-
 .../ROOT/pages/camel-3x-upgrade-guide-3_9.adoc     |  15 +-
 17 files changed, 594 insertions(+), 87 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index a23b6e7..c48fbca 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -58,6 +58,7 @@
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platf [...]
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic num [...]
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...]
+    "kafkaConsumerReconnectExceptionStrategy": { "kind": "property", "displayName": "Kafka Consumer Reconnect Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in ca [...]
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to be [...]
     "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify the [...]
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
index cfaf481..78044a8 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 101 options, which are listed below.
 
 
 
@@ -78,6 +78,7 @@ The Kafka component supports 99 options, which are listed below.
 | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. |  | StateRepository
 | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
+| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY  [...]
 | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long
 | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer
@@ -85,6 +86,7 @@ The Kafka component supports 99 options, which are listed below.
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | KafkaManualCommitFactory
+| *pollExceptionStrategy* (consumer) | *Autowired* To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. |  | PollExceptionStrategy
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. There are 4 enums and the value can be one of: none, gzip, snappy, lz4 | none | String
 | *connectionMaxIdleMs* (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
@@ -171,7 +173,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (96 parameters):
+=== Query Parameters (97 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -206,6 +208,7 @@ with the following path and query parameters:
 | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. |  | StateRepository
 | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
+| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY  [...]
 | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long
 | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer
@@ -321,6 +324,29 @@ After the message is sent to Kafka, the following headers are available
 |===
 
 
+== Consumer error handling
+
+While kafka consumer is polling messages from the kafka broker, then errors can happen. This section describes what happens and what
+you can configure.
+
+The consumer may throw exception when invoking the Kafka `poll` API. For example if the message cannot be de-serialized due invalid data,
+and many other kind of errors. Those errors are in the form of `KafkaException` which are either _retryable_ or not. The exceptions
+which can be retired (`RetriableException`) will be retried again (with a poll timeout in between). All other kind of exceptions are
+handled according to the _pollOnError_ configuration. This configuration has the following values:
+
+* DISCARD will discard the message and continue to poll next message.
+* ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message.
+* RECONNECT will re-connect the consumer and try poll the message again.
+* RETRY will let the consumer retry polling the same message again
+* STOP will stop the consumer (have to be manually started/restarted if the consumer should be able to consume messages again).
+
+The default is *ERROR_HANDLER* which will let Camel's error handler (if any configured) process the caused exception.
+And then afterwards continue to poll the next message. This behavior is similar to the _bridgeErrorHandler_ option that
+Camel components have.
+
+For advanced control then a custom implementation of `org.apache.camel.component.kafka.PollExceptionStrategy` can be configured
+on the component level, which allows to control which exceptions causes which of the strategies above.
+
 == Samples
 
 === Consuming messages from Kafka
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index e44e7a1..9848b89 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -134,6 +134,10 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "partitionkey":
         case "partitionKey": getOrCreateConfiguration(target).setPartitionKey(property(camelContext, java.lang.Integer.class, value)); return true;
         case "partitioner": getOrCreateConfiguration(target).setPartitioner(property(camelContext, java.lang.String.class, value)); return true;
+        case "pollexceptionstrategy":
+        case "pollExceptionStrategy": target.setPollExceptionStrategy(property(camelContext, org.apache.camel.component.kafka.PollExceptionStrategy.class, value)); return true;
+        case "pollonerror":
+        case "pollOnError": getOrCreateConfiguration(target).setPollOnError(property(camelContext, org.apache.camel.component.kafka.PollOnError.class, value)); return true;
         case "polltimeoutms":
         case "pollTimeoutMs": getOrCreateConfiguration(target).setPollTimeoutMs(property(camelContext, java.lang.Long.class, value)); return true;
         case "producerbatchsize":
@@ -226,7 +230,7 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
 
     @Override
     public String[] getAutowiredNames() {
-        return new String[]{"kafkaClientFactory"};
+        return new String[]{"kafkaClientFactory","pollExceptionStrategy"};
     }
 
     @Override
@@ -338,6 +342,10 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "partitionkey":
         case "partitionKey": return java.lang.Integer.class;
         case "partitioner": return java.lang.String.class;
+        case "pollexceptionstrategy":
+        case "pollExceptionStrategy": return org.apache.camel.component.kafka.PollExceptionStrategy.class;
+        case "pollonerror":
+        case "pollOnError": return org.apache.camel.component.kafka.PollOnError.class;
         case "polltimeoutms":
         case "pollTimeoutMs": return java.lang.Long.class;
         case "producerbatchsize":
@@ -538,6 +546,10 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "partitionkey":
         case "partitionKey": return getOrCreateConfiguration(target).getPartitionKey();
         case "partitioner": return getOrCreateConfiguration(target).getPartitioner();
+        case "pollexceptionstrategy":
+        case "pollExceptionStrategy": return target.getPollExceptionStrategy();
+        case "pollonerror":
+        case "pollOnError": return getOrCreateConfiguration(target).getPollOnError();
         case "polltimeoutms":
         case "pollTimeoutMs": return getOrCreateConfiguration(target).getPollTimeoutMs();
         case "producerbatchsize":
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 79855c6..dd61a57 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -124,6 +124,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "partitionkey":
         case "partitionKey": target.getConfiguration().setPartitionKey(property(camelContext, java.lang.Integer.class, value)); return true;
         case "partitioner": target.getConfiguration().setPartitioner(property(camelContext, java.lang.String.class, value)); return true;
+        case "pollonerror":
+        case "pollOnError": target.getConfiguration().setPollOnError(property(camelContext, org.apache.camel.component.kafka.PollOnError.class, value)); return true;
         case "polltimeoutms":
         case "pollTimeoutMs": target.getConfiguration().setPollTimeoutMs(property(camelContext, java.lang.Long.class, value)); return true;
         case "producerbatchsize":
@@ -318,6 +320,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "partitionkey":
         case "partitionKey": return java.lang.Integer.class;
         case "partitioner": return java.lang.String.class;
+        case "pollonerror":
+        case "pollOnError": return org.apache.camel.component.kafka.PollOnError.class;
         case "polltimeoutms":
         case "pollTimeoutMs": return java.lang.Long.class;
         case "producerbatchsize":
@@ -513,6 +517,8 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement
         case "partitionkey":
         case "partitionKey": return target.getConfiguration().getPartitionKey();
         case "partitioner": return target.getConfiguration().getPartitioner();
+        case "pollonerror":
+        case "pollOnError": return target.getConfiguration().getPollOnError();
         case "polltimeoutms":
         case "pollTimeoutMs": return target.getConfiguration().getPollTimeoutMs();
         case "producerbatchsize":
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
index b3e4c1b..c9ad3c5 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(97);
+        Set<String> props = new HashSet<>(98);
         props.add("synchronous");
         props.add("queueBufferingMaxMessages");
         props.add("allowManualCommit");
@@ -38,6 +38,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component.
         props.add("breakOnFirstError");
         props.add("requestRequiredAcks");
         props.add("enableIdempotence");
+        props.add("pollOnError");
         props.add("fetchWaitMaxMs");
         props.add("retries");
         props.add("maxPollRecords");
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index a23b6e7..b194d54 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -52,6 +52,7 @@
     "maxPollRecords": { "kind": "property", "displayName": "Max Poll Records", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "500", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum number of records returned in a single call to poll()" },
     "offsetRepository": { "kind": "property", "displayName": "Offset Repository", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.spi.StateRepository<java.lang.String, java.lang.String>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The offset repository to use in order to locally stor [...]
     "partitionAssignor": { "kind": "property", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignme [...]
+    "pollOnError": { "kind": "property", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka  [...]
     "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." },
     "seekTo": { "kind": "property", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning  [...]
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." },
@@ -59,6 +60,7 @@
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic num [...]
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in ca [...]
+    "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." },
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to be [...]
     "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify the [...]
     "connectionMaxIdleMs": { "kind": "property", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by this [...]
@@ -153,6 +155,7 @@
     "maxPollRecords": { "kind": "parameter", "displayName": "Max Poll Records", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "500", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum number of records returned in a single call to poll()" },
     "offsetRepository": { "kind": "parameter", "displayName": "Offset Repository", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.spi.StateRepository<java.lang.String, java.lang.String>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The offset repository to use in order to locally sto [...]
     "partitionAssignor": { "kind": "parameter", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignm [...]
+    "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka [...]
     "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." },
     "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning [...]
     "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." },
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index cfaf481..78044a8 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 101 options, which are listed below.
 
 
 
@@ -78,6 +78,7 @@ The Kafka component supports 99 options, which are listed below.
 | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. |  | StateRepository
 | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
+| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY  [...]
 | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long
 | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer
@@ -85,6 +86,7 @@ The Kafka component supports 99 options, which are listed below.
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | KafkaManualCommitFactory
+| *pollExceptionStrategy* (consumer) | *Autowired* To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. |  | PollExceptionStrategy
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. There are 4 enums and the value can be one of: none, gzip, snappy, lz4 | none | String
 | *connectionMaxIdleMs* (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
@@ -171,7 +173,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (96 parameters):
+=== Query Parameters (97 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -206,6 +208,7 @@ with the following path and query parameters:
 | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. |  | StateRepository
 | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
+| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY  [...]
 | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long
 | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer
@@ -321,6 +324,29 @@ After the message is sent to Kafka, the following headers are available
 |===
 
 
+== Consumer error handling
+
+While kafka consumer is polling messages from the kafka broker, then errors can happen. This section describes what happens and what
+you can configure.
+
+The consumer may throw exception when invoking the Kafka `poll` API. For example if the message cannot be de-serialized due invalid data,
+and many other kind of errors. Those errors are in the form of `KafkaException` which are either _retryable_ or not. The exceptions
+which can be retired (`RetriableException`) will be retried again (with a poll timeout in between). All other kind of exceptions are
+handled according to the _pollOnError_ configuration. This configuration has the following values:
+
+* DISCARD will discard the message and continue to poll next message.
+* ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message.
+* RECONNECT will re-connect the consumer and try poll the message again.
+* RETRY will let the consumer retry polling the same message again
+* STOP will stop the consumer (have to be manually started/restarted if the consumer should be able to consume messages again).
+
+The default is *ERROR_HANDLER* which will let Camel's error handler (if any configured) process the caused exception.
+And then afterwards continue to poll the next message. This behavior is similar to the _bridgeErrorHandler_ option that
+Camel components have.
+
+For advanced control then a custom implementation of `org.apache.camel.component.kafka.PollExceptionStrategy` can be configured
+on the component level, which allows to control which exceptions causes which of the strategies above.
+
 == Samples
 
 === Consuming messages from Kafka
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultPollExceptionStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultPollExceptionStrategy.java
new file mode 100644
index 0000000..cfdb6b7
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultPollExceptionStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+
+public class DefaultPollExceptionStrategy implements PollExceptionStrategy {
+
+    private PollOnError pollOnError;
+
+    public DefaultPollExceptionStrategy() {
+    }
+
+    public DefaultPollExceptionStrategy(PollOnError pollOnError) {
+        this.pollOnError = pollOnError;
+    }
+
+    @Override
+    public PollOnError handleException(Exception exception) {
+        if (exception instanceof RetriableException) {
+            return PollOnError.RETRY;
+        } else if (exception instanceof WakeupException) {
+            // waking up to stop
+            return PollOnError.STOP;
+        }
+
+        return pollOnError;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 205f7a6..dff7fd2 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -37,6 +37,10 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
     @Metadata(autowired = true, label = "advanced")
     private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory();
+    @Metadata(autowired = true, label = "consumer,advanced")
+    private PollExceptionStrategy pollExceptionStrategy;
+    @Metadata(label = "consumer", defaultValue = "ERROR_HANDLER")
+    private PollOnError pollOnError = PollOnError.ERROR_HANDLER;
 
     public KafkaComponent() {
     }
@@ -60,6 +64,7 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
         KafkaConfiguration copy = getConfiguration().copy();
         endpoint.setConfiguration(copy);
         endpoint.getConfiguration().setTopic(remaining);
+        endpoint.getConfiguration().setPollOnError(pollOnError);
 
         setProperties(endpoint, parameters);
 
@@ -125,4 +130,33 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
         this.kafkaClientFactory = kafkaClientFactory;
     }
 
+    public PollExceptionStrategy getPollExceptionStrategy() {
+        return pollExceptionStrategy;
+    }
+
+    /**
+     * To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while
+     * pooling messages.
+     */
+    public void setPollExceptionStrategy(PollExceptionStrategy pollExceptionStrategy) {
+        this.pollExceptionStrategy = pollExceptionStrategy;
+    }
+
+    public PollOnError getPollOnError() {
+        return pollOnError;
+    }
+
+    /**
+     * What to do if kafka threw an exception while polling for new messages.
+     *
+     * The default is ERROR_HANDLER.
+     *
+     * DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler
+     * to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer
+     * and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop
+     * the consumer (have to be manually started/restarted if the consumer should be able to consume messages again)
+     */
+    public void setPollOnError(PollOnError pollOnError) {
+        this.pollOnError = pollOnError;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index b5155b4..edad497 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -140,6 +140,8 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     private boolean breakOnFirstError;
     @UriParam(label = "consumer")
     private StateRepository<String, String> offsetRepository;
+    @UriParam(label = "consumer")
+    private PollOnError pollOnError;
 
     // Producer configuration properties
     @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
@@ -1725,4 +1727,23 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     public void setSynchronous(boolean synchronous) {
         this.synchronous = synchronous;
     }
+
+    public PollOnError getPollOnError() {
+        return pollOnError;
+    }
+
+    /**
+     * What to do if kafka threw an exception while polling for new messages.
+     *
+     * Will by default use the value from the component configuration unless an explicit value has been configured on
+     * the endpoint level.
+     *
+     * DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler
+     * to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer
+     * and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop
+     * the consumer (have to be manually started/restarted if the consumer should be able to consume messages again)
+     */
+    public void setPollOnError(PollOnError pollOnError) {
+        this.pollOnError = pollOnError;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index b6a146d..297ce8d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 import java.util.stream.StreamSupport;
 
@@ -38,6 +39,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.StateRepository;
+import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
@@ -47,7 +49,6 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.header.Header;
@@ -65,6 +66,8 @@ public class KafkaConsumer extends DefaultConsumer {
     // This list helps working around the infinite loop of KAFKA-1894
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
     private volatile boolean stopOffsetRepo;
+    private final BridgeExceptionHandlerToErrorHandler bridge = new BridgeExceptionHandlerToErrorHandler(this);
+    private PollExceptionStrategy pollExceptionStrategy;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -74,6 +77,16 @@ public class KafkaConsumer extends DefaultConsumer {
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        super.doBuild();
+        if (endpoint.getComponent().getPollExceptionStrategy() != null) {
+            pollExceptionStrategy = endpoint.getComponent().getPollExceptionStrategy();
+        } else {
+            pollExceptionStrategy = new DefaultPollExceptionStrategy(endpoint.getConfiguration().getPollOnError());
+        }
+    }
+
+    @Override
     public KafkaEndpoint getEndpoint() {
         return (KafkaEndpoint) super.getEndpoint();
     }
@@ -141,6 +154,10 @@ public class KafkaConsumer extends DefaultConsumer {
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
+                // signal kafka consumer to stop
+                for (KafkaFetchRecords task : tasks) {
+                    task.stop();
+                }
                 int timeout = getEndpoint().getConfiguration().getShutdownTimeout();
                 LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", timeout);
                 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor, timeout);
@@ -183,13 +200,13 @@ public class KafkaConsumer extends DefaultConsumer {
         @Override
         public void run() {
             boolean first = true;
-            boolean reConnect = true;
+            final AtomicBoolean reTry = new AtomicBoolean(true);
+            final AtomicBoolean reConnect = new AtomicBoolean(true);
 
-            while (reConnect) {
+            while (reTry.get() || reConnect.get()) {
                 try {
-                    if (!first) {
-                        // re-initialize on re-connect so we have a fresh
-                        // consumer
+                    if (first || reConnect.get()) {
+                        // re-initialize on re-connect so we have a fresh consumer
                         doInit();
                     }
                 } catch (Exception e) {
@@ -200,19 +217,25 @@ public class KafkaConsumer extends DefaultConsumer {
                 if (!first) {
                     // skip one poll timeout before trying again
                     long delay = endpoint.getConfiguration().getPollTimeoutMs();
-                    LOG.info("Reconnecting {} to topic {} after {} ms", threadId, topicName, delay);
+                    String prefix = reConnect.get() ? "Reconnecting" : "Retrying";
+                    LOG.info("{} {} to topic {} after {} ms", prefix, threadId, topicName, delay);
                     try {
                         Thread.sleep(delay);
                     } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
+                        boolean stopping = endpoint.getCamelContext().isStopping();
+                        if (stopping) {
+                            LOG.info(
+                                    "CamelContext is stopping so terminating KafkaConsumer thread: {} receiving from topic: {}",
+                                    threadId, topicName);
+                            return;
+                        }
                     }
                 }
 
                 first = false;
 
-                // doRun keeps running until we either shutdown or is told to
-                // re-connect
-                reConnect = doRun();
+                // doRun keeps running until we either shutdown or is told to re-connect
+                doRun(reTry, reConnect);
             }
 
             LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName);
@@ -239,64 +262,82 @@ public class KafkaConsumer extends DefaultConsumer {
         }
 
         @SuppressWarnings("unchecked")
-        protected boolean doRun() {
-            // allow to re-connect thread in case we use that to retry failed messages
-            boolean reConnect = false;
-            boolean unsubscribing = false;
+        protected void doRun(AtomicBoolean retry, AtomicBoolean reconnect) {
+            if (reconnect.get()) {
+                // on first run or reconnecting
+                doReconnectRun();
+                // set reconnect to false as its done now
+                reconnect.set(false);
+            }
+            // polling
+            doPollRun(retry, reconnect);
+        }
 
-            try {
-                if (topicPattern != null) {
-                    LOG.info("Subscribing {} to topic pattern {}", threadId, topicName);
-                    consumer.subscribe(topicPattern, this);
-                } else {
-                    LOG.info("Subscribing {} to topic {}", threadId, topicName);
-                    consumer.subscribe(Arrays.asList(topicName.split(",")), this);
-                }
+        protected void doReconnectRun() {
+            if (topicPattern != null) {
+                LOG.info("Subscribing {} to topic pattern {}", threadId, topicName);
+                consumer.subscribe(topicPattern, this);
+            } else {
+                LOG.info("Subscribing {} to topic {}", threadId, topicName);
+                consumer.subscribe(Arrays.asList(topicName.split(",")), this);
+            }
 
-                StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
-                if (offsetRepository != null) {
-                    // This poll to ensures we have an assigned partition
-                    // otherwise seek won't work
-                    ConsumerRecords poll = consumer.poll(100);
-
-                    for (TopicPartition topicPartition : (Set<TopicPartition>) consumer.assignment()) {
-                        String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
-                        if (offsetState != null && !offsetState.isEmpty()) {
-                            // The state contains the last read offset so you
-                            // need to seek from the next one
-                            long offset = deserializeOffsetValue(offsetState) + 1;
-                            LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
+            StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
+            if (offsetRepository != null) {
+                // This poll to ensures we have an assigned partition
+                // otherwise seek won't work
+                ConsumerRecords poll = consumer.poll(100);
+
+                for (TopicPartition topicPartition : (Set<TopicPartition>) consumer.assignment()) {
+                    String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
+                    if (offsetState != null && !offsetState.isEmpty()) {
+                        // The state contains the last read offset so you
+                        // need to seek from the next one
+                        long offset = deserializeOffsetValue(offsetState) + 1;
+                        LOG.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
+                        consumer.seek(topicPartition, offset);
+                    } else {
+                        // If the init poll has returned some data of a
+                        // currently unknown topic/partition in the state
+                        // then resume from their offset in order to avoid
+                        // losing data
+                        List<ConsumerRecord<Object, Object>> partitionRecords = poll.records(topicPartition);
+                        if (!partitionRecords.isEmpty()) {
+                            long offset = partitionRecords.get(0).offset();
+                            LOG.debug("Resuming partition {} from offset {}", topicPartition.partition(), offset);
                             consumer.seek(topicPartition, offset);
-                        } else {
-                            // If the init poll has returned some data of a
-                            // currently unknown topic/partition in the state
-                            // then resume from their offset in order to avoid
-                            // losing data
-                            List<ConsumerRecord<Object, Object>> partitionRecords = poll.records(topicPartition);
-                            if (!partitionRecords.isEmpty()) {
-                                long offset = partitionRecords.get(0).offset();
-                                LOG.debug("Resuming partition {} from offset {}", topicPartition.partition(), offset);
-                                consumer.seek(topicPartition, offset);
-                            }
                         }
                     }
-                } else if (endpoint.getConfiguration().getSeekTo() != null) {
-                    if (endpoint.getConfiguration().getSeekTo().equals("beginning")) {
-                        LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
-                        // This poll to ensures we have an assigned partition
-                        // otherwise seek won't work
-                        consumer.poll(Duration.ofMillis(100));
-                        consumer.seekToBeginning(consumer.assignment());
-                    } else if (endpoint.getConfiguration().getSeekTo().equals("end")) {
-                        LOG.debug("{} is seeking to the end on topic {}", threadId, topicName);
-                        // This poll to ensures we have an assigned partition
-                        // otherwise seek won't work
-                        consumer.poll(Duration.ofMillis(100));
-                        consumer.seekToEnd(consumer.assignment());
-                    }
                 }
+            } else if (endpoint.getConfiguration().getSeekTo() != null) {
+                if (endpoint.getConfiguration().getSeekTo().equals("beginning")) {
+                    LOG.debug("{} is seeking to the beginning on topic {}", threadId, topicName);
+                    // This poll to ensures we have an assigned partition
+                    // otherwise seek won't work
+                    consumer.poll(Duration.ofMillis(100));
+                    consumer.seekToBeginning(consumer.assignment());
+                } else if (endpoint.getConfiguration().getSeekTo().equals("end")) {
+                    LOG.debug("{} is seeking to the end on topic {}", threadId, topicName);
+                    // This poll to ensures we have an assigned partition
+                    // otherwise seek won't work
+                    consumer.poll(Duration.ofMillis(100));
+                    consumer.seekToEnd(consumer.assignment());
+                }
+            }
+        }
+
+        protected void doPollRun(AtomicBoolean retry, AtomicBoolean reconnect) {
+            StateRepository<String, String> offsetRepository = endpoint.getConfiguration().getOffsetRepository();
 
-                while (isRunAllowed() && !reConnect && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
+            // allow to re-connect thread in case we use that to retry failed messages
+            boolean unsubscribing = false;
+
+            TopicPartition partition = null;
+            long partitionLastOffset = -1;
+
+            try {
+                while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()
+                        && retry.get() && !reconnect.get()) {
 
                     // flag to break out processing on the first exception
                     boolean breakOnErrorHit = false;
@@ -305,8 +346,8 @@ public class KafkaConsumer extends DefaultConsumer {
 
                     Iterator<TopicPartition> partitionIterator = allRecords.partitions().iterator();
                     while (partitionIterator.hasNext()) {
-                        TopicPartition partition = partitionIterator.next();
-                        long partitionLastOffset = -1;
+                        partition = partitionIterator.next();
+                        partitionLastOffset = -1;
 
                         Iterator<ConsumerRecord<Object, Object>> recordIterator = allRecords.records(partition).iterator();
                         LOG.debug("Records count {} received for partition {}", allRecords.records(partition).size(),
@@ -392,11 +433,11 @@ public class KafkaConsumer extends DefaultConsumer {
 
                     if (breakOnErrorHit) {
                         // force re-connect
-                        reConnect = true;
+                        reconnect.set(true);
                     }
                 }
 
-                if (!reConnect) {
+                if (!reconnect.get()) {
                     if (isAutoCommitEnabled()) {
                         if ("async".equals(endpoint.getConfiguration().getAutoCommitOnStop())) {
                             LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName);
@@ -417,25 +458,100 @@ public class KafkaConsumer extends DefaultConsumer {
                 LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
-            } catch (KafkaException e) {
-                // some kind of error in kafka, it may happen during
-                // unsubscribing or during normal processing
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Exception caught while polling " + threadId + " from kafka topic " + topicName
+                              + " at offset " + lastProcessedOffset + ". Deciding what to do.",
+                            e);
+                }
                 if (unsubscribing) {
+                    // some kind of error in kafka, it may happen during unsubscribing
                     getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName,
                             e);
                 } else {
-                    LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run",
-                            threadId, topicName, e.getMessage());
-                    reConnect = true;
+                    PollOnError onError = pollExceptionStrategy.handleException(e);
+                    if (PollOnError.RETRY == onError) {
+                        LOG.warn(
+                                "{} consuming {} from topic {} causedby {}. Will attempt again polling the same message (stacktrace in DEBUG logging level)",
+                                e.getClass().getName(), threadId, topicName, e.getMessage());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "KafkaException consuming {} from topic {} causedby {}. Will attempt again polling the same message",
+                                    threadId, topicName, e.getMessage(), e);
+                        }
+                        // consumer retry the same message again
+                        retry.set(true);
+                    } else if (PollOnError.RECONNECT == onError) {
+                        LOG.warn(
+                                "{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run (stacktrace in DEBUG logging level)",
+                                e.getClass().getName(), threadId, topicName, e.getMessage());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "{} consuming {} from topic {} causedby {}. Will attempt to re-connect on next run",
+                                    e.getClass().getName(), threadId, topicName, e.getMessage(), e);
+                        }
+                        // re-connect so the consumer can try the same message again
+                        reconnect.set(true);
+                    } else if (PollOnError.ERROR_HANDLER == onError) {
+                        // use bridge error handler to route with exception
+                        bridge.handleException(e);
+                        // skip this poison message and seek to next message
+                        seekToNextOffset(partitionLastOffset);
+                    } else if (PollOnError.DISCARD == onError) {
+                        // discard message
+                        LOG.warn(
+                                "{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message (stracktrace in DEBUG logging level).",
+                                e.getClass().getName(), threadId, topicName, e.getMessage());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "{} consuming {} from topic {} causedby {}. Will discard the message and continue to poll the next message.",
+                                    e.getClass().getName(), threadId, topicName, e.getMessage(), e);
+                        }
+                        // skip this poison message and seek to next message
+                        seekToNextOffset(partitionLastOffset);
+                    } else if (PollOnError.STOP == onError) {
+                        // stop and terminate consumer
+                        LOG.warn(
+                                "{} consuming {} from topic {} causedby {}. Will stop consumer (stacktrace in DEBUG logging level).",
+                                e.getClass().getName(), threadId, topicName, e.getMessage());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "{} consuming {} from topic {} causedby {}. Will stop consumer.",
+                                    e.getClass().getName(), threadId, topicName, e.getMessage(), e);
+                        }
+                        retry.set(false);
+                        reconnect.set(false);
+                    }
                 }
-            } catch (Exception e) {
-                getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
             } finally {
-                LOG.debug("Closing {}", threadId);
-                IOHelper.close(consumer);
+                // only close if not retry or re-connecting
+                if (!retry.get() && !reconnect.get()) {
+                    LOG.debug("Closing consumer {}", threadId);
+                    IOHelper.close(consumer);
+                }
             }
+        }
 
-            return reConnect;
+        private void seekToNextOffset(long partitionLastOffset) {
+            boolean logged = false;
+            Set<TopicPartition> tps = (Set<TopicPartition>) consumer.assignment();
+            if (tps != null && partitionLastOffset != -1) {
+                long next = partitionLastOffset + 1;
+                LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", next, topicName);
+                for (TopicPartition tp : tps) {
+                    consumer.seek(tp, next);
+                }
+            } else if (tps != null) {
+                for (TopicPartition tp : tps) {
+                    long next = consumer.position(tp) + 1;
+                    if (!logged) {
+                        LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", next,
+                                topicName);
+                        logged = true;
+                    }
+                    consumer.seek(tp, next);
+                }
+            }
         }
 
         private void commitOffset(
@@ -455,7 +571,13 @@ public class KafkaConsumer extends DefaultConsumer {
             }
         }
 
-        private void shutdown() {
+        void stop() {
+            // As advised in the KAFKA-1894 ticket, calling this wakeup method
+            // breaks the infinite loop
+            consumer.wakeup();
+        }
+
+        void shutdown() {
             // As advised in the KAFKA-1894 ticket, calling this wakeup method
             // breaks the infinite loop
             consumer.wakeup();
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
new file mode 100644
index 0000000..5a004c8
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollExceptionStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+/**
+ * Strategy to decide when a Kafka exception was thrown during polling, how to handle this. For example by re-connecting
+ * and polling the same message again, by stopping the consumer (allows to re-balance and let another consumer try), or
+ * to let Camel route the message as an exception which allows Camel error handling to handle the exception, or to
+ * discard this message and poll the next message.
+ */
+public interface PollExceptionStrategy {
+
+    /**
+     * Controls how to handle the exception while polling from Kafka.
+     *
+     * @param  exception the caused exception which typically would be a {@link org.apache.kafka.common.KafkaException}
+     * @return           how to handle the exception
+     */
+    PollOnError handleException(Exception exception);
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollOnError.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollOnError.java
new file mode 100644
index 0000000..1076540
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/PollOnError.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+/**
+ * DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to
+ * process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try
+ * poll the message again RETRY will let the consumer retry polling the same message again STOP will stop the consumer
+ * (have to be manually started/restarted if the consumer should be able to consume messages again)
+ */
+public enum PollOnError {
+    DISCARD,
+    ERROR_HANDLER,
+    RECONNECT,
+    RETRY,
+    STOP
+}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 9a8c878..4c7611c 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -606,6 +606,32 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * What to do if kafka threw an exception while polling for new
+         * messages. Will by default use the value from the component
+         * configuration unless an explicit value has been configured on the
+         * endpoint level. DISCARD will discard the message and continue to poll
+         * next message. ERROR_HANDLER will use Camel's error handler to process
+         * the exception, and afterwards continue to poll next message.
+         * RECONNECT will re-connect the consumer and try poll the message again
+         * RETRY will let the consumer retry polling the same message again STOP
+         * will stop the consumer (have to be manually started/restarted if the
+         * consumer should be able to consume messages again).
+         * 
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.kafka.PollOnError&lt;/code&gt;
+         * type.
+         * 
+         * Group: consumer
+         * 
+         * @param pollOnError the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder pollOnError(
+                org.apache.camel.component.kafka.PollOnError pollOnError) {
+            doSetProperty("pollOnError", pollOnError);
+            return this;
+        }
+        /**
          * The timeout used when polling the KafkaConsumer.
          * 
          * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
@@ -726,6 +752,23 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * To use a custom strategy with the consumer to control how to handle
+         * exceptions thrown from the Kafka broker while pooling messages.
+         * 
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.kafka.PollExceptionStrategy&lt;/code&gt; type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param pollExceptionStrategy the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder pollExceptionStrategy(
+                org.apache.camel.component.kafka.PollExceptionStrategy pollExceptionStrategy) {
+            doSetProperty("pollExceptionStrategy", pollExceptionStrategy);
+            return this;
+        }
+        /**
          * The total bytes of memory the producer can use to buffer records
          * waiting to be sent to the server. If records are sent faster than
          * they can be delivered to the server the producer will either block or
@@ -1924,6 +1967,7 @@ public interface KafkaComponentBuilderFactory {
             case "maxPollRecords": getOrCreateConfiguration((KafkaComponent) component).setMaxPollRecords((java.lang.Integer) value); return true;
             case "offsetRepository": getOrCreateConfiguration((KafkaComponent) component).setOffsetRepository((org.apache.camel.spi.StateRepository) value); return true;
             case "partitionAssignor": getOrCreateConfiguration((KafkaComponent) component).setPartitionAssignor((java.lang.String) value); return true;
+            case "pollOnError": getOrCreateConfiguration((KafkaComponent) component).setPollOnError((org.apache.camel.component.kafka.PollOnError) value); return true;
             case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent) component).setPollTimeoutMs((java.lang.Long) value); return true;
             case "seekTo": getOrCreateConfiguration((KafkaComponent) component).setSeekTo((java.lang.String) value); return true;
             case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent) component).setSessionTimeoutMs((java.lang.Integer) value); return true;
@@ -1931,6 +1975,7 @@ public interface KafkaComponentBuilderFactory {
             case "topicIsPattern": getOrCreateConfiguration((KafkaComponent) component).setTopicIsPattern((boolean) value); return true;
             case "valueDeserializer": getOrCreateConfiguration((KafkaComponent) component).setValueDeserializer((java.lang.String) value); return true;
             case "kafkaManualCommitFactory": ((KafkaComponent) component).setKafkaManualCommitFactory((org.apache.camel.component.kafka.KafkaManualCommitFactory) value); return true;
+            case "pollExceptionStrategy": ((KafkaComponent) component).setPollExceptionStrategy((org.apache.camel.component.kafka.PollExceptionStrategy) value); return true;
             case "bufferMemorySize": getOrCreateConfiguration((KafkaComponent) component).setBufferMemorySize((java.lang.Integer) value); return true;
             case "compressionCodec": getOrCreateConfiguration((KafkaComponent) component).setCompressionCodec((java.lang.String) value); return true;
             case "connectionMaxIdleMs": getOrCreateConfiguration((KafkaComponent) component).setConnectionMaxIdleMs((java.lang.Integer) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index f8b20a3..5c18f44 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1024,6 +1024,56 @@ public interface KafkaEndpointBuilderFactory {
             return this;
         }
         /**
+         * What to do if kafka threw an exception while polling for new
+         * messages. Will by default use the value from the component
+         * configuration unless an explicit value has been configured on the
+         * endpoint level. DISCARD will discard the message and continue to poll
+         * next message. ERROR_HANDLER will use Camel's error handler to process
+         * the exception, and afterwards continue to poll next message.
+         * RECONNECT will re-connect the consumer and try poll the message again
+         * RETRY will let the consumer retry polling the same message again STOP
+         * will stop the consumer (have to be manually started/restarted if the
+         * consumer should be able to consume messages again).
+         * 
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.kafka.PollOnError&lt;/code&gt;
+         * type.
+         * 
+         * Group: consumer
+         * 
+         * @param pollOnError the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointConsumerBuilder pollOnError(PollOnError pollOnError) {
+            doSetProperty("pollOnError", pollOnError);
+            return this;
+        }
+        /**
+         * What to do if kafka threw an exception while polling for new
+         * messages. Will by default use the value from the component
+         * configuration unless an explicit value has been configured on the
+         * endpoint level. DISCARD will discard the message and continue to poll
+         * next message. ERROR_HANDLER will use Camel's error handler to process
+         * the exception, and afterwards continue to poll next message.
+         * RECONNECT will re-connect the consumer and try poll the message again
+         * RETRY will let the consumer retry polling the same message again STOP
+         * will stop the consumer (have to be manually started/restarted if the
+         * consumer should be able to consume messages again).
+         * 
+         * The option will be converted to a
+         * &lt;code&gt;org.apache.camel.component.kafka.PollOnError&lt;/code&gt;
+         * type.
+         * 
+         * Group: consumer
+         * 
+         * @param pollOnError the value to set
+         * @return the dsl builder
+         */
+        default KafkaEndpointConsumerBuilder pollOnError(String pollOnError) {
+            doSetProperty("pollOnError", pollOnError);
+            return this;
+        }
+        /**
          * The timeout used when polling the KafkaConsumer.
          * 
          * The option is a: &lt;code&gt;java.lang.Long&lt;/code&gt; type.
@@ -4341,6 +4391,18 @@ public interface KafkaEndpointBuilderFactory {
         }
     }
 
+    /**
+     * Proxy enum for <code>org.apache.camel.component.kafka.PollOnError</code>
+     * enum.
+     */
+    enum PollOnError {
+        DISCARD,
+        ERROR_HANDLER,
+        RECONNECT,
+        RETRY,
+        STOP;
+    }
+
     public interface KafkaBuilders {
         /**
          * Kafka (camel-kafka)
diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc
index c74f74c..3de8eba 100644
--- a/docs/components/modules/ROOT/pages/kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/kafka-component.adoc
@@ -43,7 +43,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 101 options, which are listed below.
 
 
 
@@ -80,6 +80,7 @@ The Kafka component supports 99 options, which are listed below.
 | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. |  | StateRepository
 | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
+| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY  [...]
 | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long
 | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer
@@ -87,6 +88,7 @@ The Kafka component supports 99 options, which are listed below.
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | KafkaManualCommitFactory
+| *pollExceptionStrategy* (consumer) | *Autowired* To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. |  | PollExceptionStrategy
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. There are 4 enums and the value can be one of: none, gzip, snappy, lz4 | none | String
 | *connectionMaxIdleMs* (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer
@@ -173,7 +175,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (96 parameters):
+=== Query Parameters (97 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -208,6 +210,7 @@ with the following path and query parameters:
 | *maxPollRecords* (consumer) | The maximum number of records returned in a single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit. |  | StateRepository
 | *partitionAssignor* (consumer) | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
+| *pollOnError* (consumer) | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY  [...]
 | *pollTimeoutMs* (consumer) | The timeout used when polling the KafkaConsumer. | 5000 | Long
 | *seekTo* (consumer) | Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning. There are 2 enums and the value can be one of: beginning, end |  | String
 | *sessionTimeoutMs* (consumer) | The timeout used to detect failures when using Kafka's group management facilities. | 10000 | Integer
@@ -323,6 +326,29 @@ After the message is sent to Kafka, the following headers are available
 |===
 
 
+== Consumer error handling
+
+While kafka consumer is polling messages from the kafka broker, then errors can happen. This section describes what happens and what
+you can configure.
+
+The consumer may throw exception when invoking the Kafka `poll` API. For example if the message cannot be de-serialized due invalid data,
+and many other kind of errors. Those errors are in the form of `KafkaException` which are either _retryable_ or not. The exceptions
+which can be retired (`RetriableException`) will be retried again (with a poll timeout in between). All other kind of exceptions are
+handled according to the _pollOnError_ configuration. This configuration has the following values:
+
+* DISCARD will discard the message and continue to poll next message.
+* ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message.
+* RECONNECT will re-connect the consumer and try poll the message again.
+* RETRY will let the consumer retry polling the same message again
+* STOP will stop the consumer (have to be manually started/restarted if the consumer should be able to consume messages again).
+
+The default is *ERROR_HANDLER* which will let Camel's error handler (if any configured) process the caused exception.
+And then afterwards continue to poll the next message. This behavior is similar to the _bridgeErrorHandler_ option that
+Camel components have.
+
+For advanced control then a custom implementation of `org.apache.camel.component.kafka.PollExceptionStrategy` can be configured
+on the component level, which allows to control which exceptions causes which of the strategies above.
+
 == Samples
 
 === Consuming messages from Kafka
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc
index 9e52652..93b614c 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc
@@ -150,4 +150,17 @@ The camel-debezium-parent module has been renamed to camel-debezium-common-paren
 
 === camel-jclouds
 
-The camel-jclouds feature for Camel on Karaf has been removed.
\ No newline at end of file
+The camel-jclouds feature for Camel on Karaf has been removed.
+
+=== camel-kafka
+
+The camel-kafka consumer has been improved to be more roboust and have more confirations how to deal with exceptions while polling from Kafka Brokers.
+In case of any exception thrown, then previously the consumer will re-connect and therefore try again. This leads to Kafka broker would reasign the partions,
+but it may assign back the same consumer again, or another standby consumer.
+
+The new behavior is to only retry certain kind of exceptions which Kafka has marked as retryable. Any other exceptions is now
+causing Camel error handler to handle the caused exception (will log by default but you can use onException etc), and then
+skip to next offset so the next message can be polled and processed by Camel.
+
+See the updated camel-kafka documentation for more details.
+