You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/20 14:37:46 UTC

[camel] 01/01: CAMEL-14980: camel-kafka - Consumer should only re-connect on retryable exceptions. Other severe exceptions should be propagated.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch 14980
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c123f7c98a038838f4ef9b26a9ddf7193783d80e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 20 15:37:04 2021 +0100

    CAMEL-14980: camel-kafka - Consumer should only re-connect on retryable exceptions. Other severe exceptions should be propagated.
---
 .../apache/camel/catalog/docs/kafka-component.adoc |  3 +-
 .../component/kafka/KafkaComponentConfigurer.java  |  6 ++++
 .../org/apache/camel/component/kafka/kafka.json    |  1 +
 .../camel-kafka/src/main/docs/kafka-component.adoc |  3 +-
 ...ultKafkaConsumerReconnectExceptionStrategy.java | 34 +++++++++++++++++++++
 .../camel/component/kafka/KafkaComponent.java      | 15 ++++++++++
 .../camel/component/kafka/KafkaConsumer.java       | 35 +++++++++++++++-------
 .../KafkaConsumerReconnectExceptionStrategy.java   | 34 +++++++++++++++++++++
 .../dsl/KafkaComponentBuilderFactory.java          | 18 +++++++++++
 .../modules/ROOT/pages/kafka-component.adoc        |  3 +-
 10 files changed, 138 insertions(+), 14 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
index cfaf481..02f20fb 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 100 options, which are listed below.
 
 
 
@@ -84,6 +84,7 @@ The Kafka component supports 99 options, which are listed below.
 | *specificAvroReader* (consumer) | This enables the use of a specific Avro reader for use with the Confluent Platform schema registry and the io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only available in the Confluent Platform (not standard Apache Kafka) | false | boolean
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaConsumerReconnect{zwsp}ExceptionStrategy* (consumer) | To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. |  | KafkaConsumerReconnectExceptionStrategy
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | KafkaManualCommitFactory
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. There are 4 enums and the value can be one of: none, gzip, snappy, lz4 | none | String
diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index e44e7a1..70a67bd 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -86,6 +86,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "interceptorClasses": getOrCreateConfiguration(target).setInterceptorClasses(property(camelContext, java.lang.String.class, value)); return true;
         case "kafkaclientfactory":
         case "kafkaClientFactory": target.setKafkaClientFactory(property(camelContext, org.apache.camel.component.kafka.KafkaClientFactory.class, value)); return true;
+        case "kafkaconsumerreconnectexceptionstrategy":
+        case "kafkaConsumerReconnectExceptionStrategy": target.setKafkaConsumerReconnectExceptionStrategy(property(camelContext, org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy.class, value)); return true;
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": target.setKafkaManualCommitFactory(property(camelContext, org.apache.camel.component.kafka.KafkaManualCommitFactory.class, value)); return true;
         case "kerberosbeforereloginmintime":
@@ -290,6 +292,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "interceptorClasses": return java.lang.String.class;
         case "kafkaclientfactory":
         case "kafkaClientFactory": return org.apache.camel.component.kafka.KafkaClientFactory.class;
+        case "kafkaconsumerreconnectexceptionstrategy":
+        case "kafkaConsumerReconnectExceptionStrategy": return org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy.class;
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": return org.apache.camel.component.kafka.KafkaManualCommitFactory.class;
         case "kerberosbeforereloginmintime":
@@ -490,6 +494,8 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
         case "interceptorClasses": return getOrCreateConfiguration(target).getInterceptorClasses();
         case "kafkaclientfactory":
         case "kafkaClientFactory": return target.getKafkaClientFactory();
+        case "kafkaconsumerreconnectexceptionstrategy":
+        case "kafkaConsumerReconnectExceptionStrategy": return target.getKafkaConsumerReconnectExceptionStrategy();
         case "kafkamanualcommitfactory":
         case "kafkaManualCommitFactory": return target.getKafkaManualCommitFactory();
         case "kerberosbeforereloginmintime":
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index a23b6e7..c48fbca 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -58,6 +58,7 @@
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platf [...]
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic num [...]
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value th [...]
+    "kafkaConsumerReconnectExceptionStrategy": { "kind": "property", "displayName": "Kafka Consumer Reconnect Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka [...]
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in ca [...]
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to be [...]
     "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify the [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index cfaf481..02f20fb 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -41,7 +41,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 100 options, which are listed below.
 
 
 
@@ -84,6 +84,7 @@ The Kafka component supports 99 options, which are listed below.
 | *specificAvroReader* (consumer) | This enables the use of a specific Avro reader for use with the Confluent Platform schema registry and the io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only available in the Confluent Platform (not standard Apache Kafka) | false | boolean
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaConsumerReconnect{zwsp}ExceptionStrategy* (consumer) | To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. |  | KafkaConsumerReconnectExceptionStrategy
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | KafkaManualCommitFactory
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. There are 4 enums and the value can be one of: none, gzip, snappy, lz4 | none | String
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaConsumerReconnectExceptionStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaConsumerReconnectExceptionStrategy.java
new file mode 100644
index 0000000..65235b2
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/DefaultKafkaConsumerReconnectExceptionStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+
+public class DefaultKafkaConsumerReconnectExceptionStrategy implements KafkaConsumerReconnectExceptionStrategy {
+
+    @Override
+    public boolean reconnect(Exception exception) {
+        // only reconnect exceptions that indicates its recoverable or if some external thread is waking up the kafka consumer
+        if (exception instanceof RetriableException || exception instanceof WakeupException) {
+            return true;
+        }
+
+        // cannot recover so let Camel exception handler deal with it
+        return false;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 205f7a6..a25c857 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -37,6 +37,9 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
     private KafkaManualCommitFactory kafkaManualCommitFactory = new DefaultKafkaManualCommitFactory();
     @Metadata(autowired = true, label = "advanced")
     private KafkaClientFactory kafkaClientFactory = new DefaultKafkaClientFactory();
+    @Metadata(label = "consumer,advanced")
+    private KafkaConsumerReconnectExceptionStrategy kafkaConsumerReconnectExceptionStrategy
+            = new DefaultKafkaConsumerReconnectExceptionStrategy();
 
     public KafkaComponent() {
     }
@@ -125,4 +128,16 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
         this.kafkaClientFactory = kafkaClientFactory;
     }
 
+    public KafkaConsumerReconnectExceptionStrategy getKafkaConsumerReconnectExceptionStrategy() {
+        return kafkaConsumerReconnectExceptionStrategy;
+    }
+
+    /**
+     * To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while
+     * pooling messages.
+     */
+    public void setKafkaConsumerReconnectExceptionStrategy(
+            KafkaConsumerReconnectExceptionStrategy kafkaConsumerReconnectExceptionStrategy) {
+        this.kafkaConsumerReconnectExceptionStrategy = kafkaConsumerReconnectExceptionStrategy;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index b6a146d..c2bc7b3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -47,7 +47,6 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.header.Header;
@@ -417,22 +416,36 @@ public class KafkaConsumer extends DefaultConsumer {
                 LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
                 consumer.unsubscribe();
                 Thread.currentThread().interrupt();
-            } catch (KafkaException e) {
-                // some kind of error in kafka, it may happen during
-                // unsubscribing or during normal processing
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Exception caught while polling " + threadId + " from kafka topic " + topicName
+                              + ". Deciding what to do.",
+                            e);
+                }
                 if (unsubscribing) {
+                    // some kind of error in kafka, it may happen during unsubscribing
                     getExceptionHandler().handleException("Error unsubscribing " + threadId + " from kafka topic " + topicName,
                             e);
                 } else {
-                    LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run",
-                            threadId, topicName, e.getMessage());
-                    reConnect = true;
+                    boolean retry = getEndpoint().getComponent().getKafkaConsumerReconnectExceptionStrategy().reconnect(e);
+                    if (retry) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run",
+                                    threadId, topicName, e.getMessage());
+                        }
+                        reConnect = true;
+                    } else {
+                        getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic " + topicName,
+                                e);
+                    }
                 }
-            } catch (Exception e) {
-                getExceptionHandler().handleException("Error consuming " + threadId + " from kafka topic", e);
             } finally {
-                LOG.debug("Closing {}", threadId);
-                IOHelper.close(consumer);
+                // only close if not re-connecting
+                if (!reConnect) {
+                    LOG.debug("Closing {}", threadId);
+                    IOHelper.close(consumer);
+                }
             }
 
             return reConnect;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerReconnectExceptionStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerReconnectExceptionStrategy.java
new file mode 100644
index 0000000..676e07d
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumerReconnectExceptionStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+/**
+ * Strategy to decide when a Kafka exception was thrown during pooling, how to handle this, either be re-connecting with
+ * a new session and retry polling again, or let Camel {@link org.apache.camel.spi.ExceptionHandler} handle the
+ * exception.
+ */
+public interface KafkaConsumerReconnectExceptionStrategy {
+
+    /**
+     * Whether to reconnect or let Camel {@link org.apache.camel.spi.ExceptionHandler} handle the exception.
+     *
+     * @param  exception the caused exception which typically would be a {@link org.apache.kafka.common.KafkaException}
+     * @return           true to re-connect, false to let Camel {@link org.apache.camel.spi.ExceptionHandler} handle the
+     *                   exception
+     */
+    boolean reconnect(Exception exception);
+}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 9a8c878..770335a 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -706,6 +706,23 @@ public interface KafkaComponentBuilderFactory {
             return this;
         }
         /**
+         * To use a custom strategy with the consumer to control how to handle
+         * exceptions thrown from the Kafka broker while pooling messages.
+         * 
+         * The option is a:
+         * &lt;code&gt;org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy&lt;/code&gt; type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param kafkaConsumerReconnectExceptionStrategy the value to set
+         * @return the dsl builder
+         */
+        default KafkaComponentBuilder kafkaConsumerReconnectExceptionStrategy(
+                org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy kafkaConsumerReconnectExceptionStrategy) {
+            doSetProperty("kafkaConsumerReconnectExceptionStrategy", kafkaConsumerReconnectExceptionStrategy);
+            return this;
+        }
+        /**
          * Factory to use for creating KafkaManualCommit instances. This allows
          * to plugin a custom factory to create custom KafkaManualCommit
          * instances in case special logic is needed when doing manual commits
@@ -1930,6 +1947,7 @@ public interface KafkaComponentBuilderFactory {
             case "specificAvroReader": getOrCreateConfiguration((KafkaComponent) component).setSpecificAvroReader((boolean) value); return true;
             case "topicIsPattern": getOrCreateConfiguration((KafkaComponent) component).setTopicIsPattern((boolean) value); return true;
             case "valueDeserializer": getOrCreateConfiguration((KafkaComponent) component).setValueDeserializer((java.lang.String) value); return true;
+            case "kafkaConsumerReconnectExceptionStrategy": ((KafkaComponent) component).setKafkaConsumerReconnectExceptionStrategy((org.apache.camel.component.kafka.KafkaConsumerReconnectExceptionStrategy) value); return true;
             case "kafkaManualCommitFactory": ((KafkaComponent) component).setKafkaManualCommitFactory((org.apache.camel.component.kafka.KafkaManualCommitFactory) value); return true;
             case "bufferMemorySize": getOrCreateConfiguration((KafkaComponent) component).setBufferMemorySize((java.lang.Integer) value); return true;
             case "compressionCodec": getOrCreateConfiguration((KafkaComponent) component).setCompressionCodec((java.lang.String) value); return true;
diff --git a/docs/components/modules/ROOT/pages/kafka-component.adoc b/docs/components/modules/ROOT/pages/kafka-component.adoc
index c74f74c..dd5d130 100644
--- a/docs/components/modules/ROOT/pages/kafka-component.adoc
+++ b/docs/components/modules/ROOT/pages/kafka-component.adoc
@@ -43,7 +43,7 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 99 options, which are listed below.
+The Kafka component supports 100 options, which are listed below.
 
 
 
@@ -86,6 +86,7 @@ The Kafka component supports 99 options, which are listed below.
 | *specificAvroReader* (consumer) | This enables the use of a specific Avro reader for use with the Confluent Platform schema registry and the io.confluent.kafka.serializers.KafkaAvroDeserializer. This option is only available in the Confluent Platform (not standard Apache Kafka) | false | boolean
 | *topicIsPattern* (consumer) | Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern. | false | boolean
 | *valueDeserializer* (consumer) | Deserializer class for value that implements the Deserializer interface. | org.apache.kafka.common.serialization.StringDeserializer | String
+| *kafkaConsumerReconnect{zwsp}ExceptionStrategy* (consumer) | To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. |  | KafkaConsumerReconnectExceptionStrategy
 | *kafkaManualCommitFactory* (consumer) | Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box. |  | KafkaManualCommitFactory
 | *bufferMemorySize* (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additio [...]
 | *compressionCodec* (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy. There are 4 enums and the value can be one of: none, gzip, snappy, lz4 | none | String