You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/30 07:00:32 UTC

[camel-kafka-connector] 01/02: First attempt Camel Error Handling level

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch error-handling
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 9fa60d144d047cd3bcb04c336c736dbc98ef6263
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Oct 29 22:59:07 2020 +0100

    First attempt Camel Error Handling level
---
 .../camel/kafkaconnector/CamelConnectorConfig.java | 12 ++++++++
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  5 +++-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  7 ++++-
 .../kafkaconnector/CamelSourceConnectorConfig.java |  5 +++-
 .../camel/kafkaconnector/CamelSourceTask.java      |  8 +++++-
 .../utils/CamelKafkaConnectMain.java               | 32 ++++++++++++++++++++++
 6 files changed, 65 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
index ae67309..4271938 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelConnectorConfig.java
@@ -35,6 +35,18 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
     public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF = "camel.aggregation.timeout";
     public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate";
 
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT = "default";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_CONF = "camel.error.handler";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DOC = "The error handler to use: possible value are 'no' or 'default'";
+    
+    public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler";
+    
+    public static final Long CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT = 1000L;
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay";
+    public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler";
+    
     protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         super(definition, originals, configProviderProps, doLog);
     }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 980aad1..e86e921 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -54,7 +54,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
-        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
+        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
 
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 08967cc..a38afea 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -82,7 +82,9 @@ public class CamelSinkTask extends SinkTask {
             final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
             final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
             final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
-
+            final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
+            final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
+            final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
                 remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -98,6 +100,9 @@ public class CamelSinkTask extends SinkTask {
                 .withMarshallDataFormat(marshaller)
                 .withAggregationSize(size)
                 .withAggregationTimeout(timeout)
+                .withErrorHandler(errorHandler)
+                .withMaxRedeliveries(maxRedeliveries)
+                .withRedeliveryDelay(redeliveryDelay)
                 .build(camelContext);
 
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index bdfc5c7..2ecfb45 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -90,7 +90,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
         .define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
         .define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
-        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
+        .define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
+        .define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);
 
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index aef600f..165ffff 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -88,7 +88,10 @@ public class CamelSourceTask extends SourceTask {
             final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
             final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
             final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);
-
+            final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
+            final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
+            final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+            
             topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");
 
             String localUrl = getLocalUrlWithPollingOptions(config);
@@ -107,6 +110,9 @@ public class CamelSourceTask extends SourceTask {
                 .withMarshallDataFormat(marshaller)
                 .withAggregationSize(size)
                 .withAggregationTimeout(timeout)
+                .withErrorHandler(errorHandler)
+                .withMaxRedeliveries(maxRedeliveries)
+                .withRedeliveryDelay(redeliveryDelay)
                 .build(camelContext);
 
             consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
index 36ec56a..3dbf045 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java
@@ -91,6 +91,9 @@ public class CamelKafkaConnectMain extends SimpleMain {
         private String unmarshallDataFormat;
         private int aggregationSize;
         private long aggregationTimeout;
+        private String errorHandler;
+        private int maxRedeliveries;
+        private long redeliveryDelay;
 
         public Builder(String from, String to) {
             this.from = from;
@@ -121,6 +124,21 @@ public class CamelKafkaConnectMain extends SimpleMain {
             this.aggregationTimeout = aggregationTimeout;
             return this;
         }
+        
+        public Builder withErrorHandler(String errorHandler) {
+        	this.errorHandler = errorHandler;
+        	return this;
+        }
+        
+        public Builder withMaxRedeliveries(int maxRedeliveries) {
+            this.maxRedeliveries = maxRedeliveries;
+            return this;
+        }
+        
+        public Builder withRedeliveryDelay(long redeliveryDelay) {
+            this.redeliveryDelay = redeliveryDelay;
+            return this;
+        }
 
         public CamelKafkaConnectMain build(CamelContext camelContext) {
             CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
@@ -138,6 +156,20 @@ public class CamelKafkaConnectMain extends SimpleMain {
                     //from
                     RouteDefinition rd = from(from);
                     LOG.info("Creating Camel route from({})", from);
+                    
+                    if (!ObjectHelper.isEmpty(props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF))) {
+                    	String errorHandler = props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
+                    	switch (errorHandler) {
+						case "no":
+							rd.errorHandler(noErrorHandler());
+							break;
+						case "default":
+							rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
+							break;
+						default:
+							break;
+						}
+                    }
 
                     //dataformats
                     if (!ObjectHelper.isEmpty(marshallDataFormat)) {